package eu.qualimaster.monitoring.storm;

import backtype.storm.generated.ExecutorSpecificStats;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import eu.qualimaster.Configuration;
import eu.qualimaster.coordination.CoordinationManager;
import eu.qualimaster.coordination.INameMapping;
import eu.qualimaster.monitoring.AbstractContainerMonitoringTask;
import eu.qualimaster.monitoring.systemState.SystemState;
import eu.qualimaster.observables.IObservable;
import eu.qualimaster.observables.TimeBehavior;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

/* loaded from: input_file:eu/qualimaster/monitoring/storm/ThriftMonitoringTask.class */
class ThriftMonitoringTask extends AbstractContainerMonitoringTask {
    static final String ALL_TIME = ":all-time";
    static final String AT_10M = "600";
    static final String AT_3H = "10800";
    static final String AT_1D = "86400";
    private static final Logger LOGGER = LogManager.getLogger(ThriftMonitoringTask.class);
    private String pipeline;
    private Set<String> topologyNames;
    private Connection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThriftMonitoringTask(String str, Connection connection, SystemState systemState) {
        super(systemState);
        this.topologyNames = new HashSet();
        this.pipeline = str;
        this.connection = connection;
        INameMapping nameMapping = CoordinationManager.getNameMapping(str);
        if (nameMapping != null) {
            this.topologyNames.addAll(nameMapping.getPipelineNames());
        } else {
            LOGGER.error("no name mapping for pipeline " + str);
        }
    }

    @Override // java.util.TimerTask, java.lang.Runnable
    public void run() {
        if (this.connection.open()) {
            try {
                List list = this.connection.getClusterSummary().get_topologies();
                for (int i = 0; i < list.size(); i++) {
                    TopologySummary topologySummary = (TopologySummary) list.get(i);
                    if (this.topologyNames.contains(topologySummary.get_name())) {
                        try {
                            aggregateTopology(this.connection.getTopologyInfo(topologySummary.get_id()));
                        } catch (NotAliveException e) {
                        }
                    }
                }
            } catch (TException e2) {
                LOGGER.error("Cannot obtain thrift data " + e2.getMessage(), e2);
            } catch (IllegalStateException e3) {
            }
        }
    }

    private void aggregateTopology(TopologyInfo topologyInfo) {
        INameMapping nameMapping = CoordinationManager.getNameMapping(this.pipeline);
        if (nameMapping == null) {
            LOGGER.error("no mapping for " + topologyInfo.get_name());
            return;
        }
        SystemState.PipelineSystemPart pipeline = getState().getPipeline(nameMapping.getPipelineName());
        List list = topologyInfo.get_executors();
        long j = 0;
        double d = 0.0d;
        boolean z = false;
        for (int i = 0; i < list.size(); i++) {
            ExecutorSummary executorSummary = (ExecutorSummary) list.get(i);
            String str = executorSummary.get_component_id();
            boolean startsWith = str.startsWith("__");
            ExecutorStats executorStats = executorSummary.get_stats();
            if (executorStats != null) {
                ExecutorSpecificStats executorSpecificStats = executorStats.get_specific();
                double doubleStatValue = executorSpecificStats.is_set_bolt() ? getDoubleStatValue(executorSpecificStats.get_bolt().get_process_ms_avg(), ALL_TIME) : getDoubleStatValue(executorSpecificStats.get_spout().get_complete_ms_avg(), ALL_TIME);
                long longStatValue = getLongStatValue(executorStats.get_transferred(), ALL_TIME);
                if (!startsWith) {
                    SystemState.PipelineNodeSystemPart pipelineNode = pipeline.getPipelineNode(str);
                    if (pipelineNode != null) {
                        setPartPositiveValue(pipelineNode, TimeBehavior.THROUGHPUT_ITEMS, longStatValue);
                        setPartPositiveValue(pipelineNode, TimeBehavior.LATENCY, doubleStatValue);
                    } else {
                        LOGGER.error("no mapping for executor " + str);
                    }
                }
                j += longStatValue;
                d += doubleStatValue;
                z = true;
            }
        }
        if (z) {
            setPartPositiveValue(pipeline, TimeBehavior.THROUGHPUT_ITEMS, j);
            setPartPositiveValue(pipeline, TimeBehavior.LATENCY, d);
            sendDemoEvent(pipeline);
        }
    }

    private static void setPartPositiveValue(SystemState.SystemPart systemPart, IObservable iObservable, double d) {
        if (d > 0.0d) {
            systemPart.setValue(iObservable, d, (Object) null);
        }
    }

    public static <T> double getDoubleStatValue(Map<String, Map<T, Double>> map, String str) {
        double d = 0.0d;
        Map<T, Double> map2 = map.get(str);
        if (map2 != null) {
            Iterator<Map.Entry<T, Double>> it = map2.entrySet().iterator();
            while (it.hasNext()) {
                Double value = it.next().getValue();
                if (value != null) {
                    d += value.doubleValue();
                }
            }
        }
        return d;
    }

    public static <T> long getLongStatValue(Map<String, Map<T, Long>> map, String str) {
        long j = 0;
        Map<T, Long> map2 = map.get(str);
        if (map2 != null) {
            Iterator<Map.Entry<T, Long>> it = map2.entrySet().iterator();
            while (it.hasNext()) {
                Long value = it.next().getValue();
                if (value != null) {
                    j += value.longValue();
                }
            }
        }
        return j;
    }

    @Override // java.util.TimerTask
    public boolean cancel() {
        return super.cancel();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public int getFrequency() {
        return Configuration.getPipelineMonitoringFrequency();
    }
}
