package eu.qualimaster.monitoring.storm;

import backtype.storm.generated.BoltStats;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import eu.qualimaster.Configuration;
import eu.qualimaster.common.signal.ThriftConnection;
import eu.qualimaster.coordination.CoordinationManager;
import eu.qualimaster.coordination.INameMapping;
import eu.qualimaster.dataManagement.DataManager;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import eu.qualimaster.monitoring.AbstractContainerMonitoringTask;
import eu.qualimaster.monitoring.MonitoringManager;
import eu.qualimaster.monitoring.systemState.SystemState;
import eu.qualimaster.observables.IObservable;
import eu.qualimaster.observables.ResourceUsage;
import eu.qualimaster.observables.Scalability;
import eu.qualimaster.observables.TimeBehavior;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

/* loaded from: input_file:eu/qualimaster/monitoring/storm/ThriftMonitoringTask.class */
class ThriftMonitoringTask extends AbstractContainerMonitoringTask {
    static final String ALL_TIME = ":all-time";
    static final String AT_10M = "600";
    static final String AT_3H = "10800";
    static final String AT_1D = "86400";
    private static final Logger LOGGER = LogManager.getLogger(ThriftMonitoringTask.class);
    private String pipeline;
    private Set<String> topologyNames;
    private ThriftConnection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThriftMonitoringTask(String str, ThriftConnection thriftConnection, SystemState systemState) {
        super(systemState);
        this.topologyNames = new HashSet();
        this.pipeline = str;
        this.connection = thriftConnection;
        INameMapping nameMapping = CoordinationManager.getNameMapping(str);
        if (null != nameMapping) {
            this.topologyNames.addAll(nameMapping.getPipelineNames());
        } else {
            LOGGER.error("no name mapping for pipeline " + str);
        }
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask, java.util.TimerTask, java.lang.Runnable
    public void run() {
        if (this.connection.open()) {
            try {
                List list = this.connection.getClusterSummary().get_topologies();
                HashSet hashSet = new HashSet();
                for (int i = 0; i < list.size(); i++) {
                    TopologySummary topologySummary = (TopologySummary) list.get(i);
                    if (this.topologyNames.contains(topologySummary.get_name())) {
                        try {
                            SystemState.PipelineSystemPart aggregateTopology = aggregateTopology(this.connection.getTopologyInfo(topologySummary.get_id()));
                            if (null != aggregateTopology) {
                                hashSet.add(aggregateTopology);
                            }
                        } catch (NotAliveException e) {
                        }
                    }
                }
                for (SystemState.PipelineSystemPart pipelineSystemPart : getState().getPipelines()) {
                    if (!hashSet.contains(pipelineSystemPart) && (PipelineLifecycleEvent.Status.STOPPED == pipelineSystemPart.getStatus() || PipelineLifecycleEvent.Status.STOPPING == pipelineSystemPart.getStatus())) {
                        pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.DISAPPEARED, true);
                    }
                }
            } catch (TException e2) {
                LOGGER.error("Cannot obtain thrift data " + e2.getMessage(), e2);
            } catch (IllegalStateException e3) {
            }
        }
        super.run();
    }

    private boolean isSink(INameMapping iNameMapping, String str) {
        List pipelineNodeComponents = iNameMapping.getPipelineNodeComponents(str);
        return null != pipelineNodeComponents && 1 == pipelineNodeComponents.size() && INameMapping.Component.Type.SINK == ((INameMapping.Component) pipelineNodeComponents.get(0)).getType();
    }

    private SystemState.PipelineSystemPart aggregateTopology(TopologyInfo topologyInfo) {
        SystemState.PipelineSystemPart pipelineSystemPart = null;
        INameMapping nameMapping = CoordinationManager.getNameMapping(this.pipeline);
        if (null != nameMapping) {
            pipelineSystemPart = getState().getPipeline(nameMapping.getPipelineName());
            if (PipelineLifecycleEvent.Status.INITIALIZED == pipelineSystemPart.getStatus() && !DataManager.isStarted() && System.currentTimeMillis() - pipelineSystemPart.getLastStateChange() > Configuration.getPipelineStartNotificationDelay()) {
                pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.STARTED, true);
            }
            List list = topologyInfo.get_executors();
            Map<String, PipelineElementStatistics> mapInstance = PipelineElementStatistics.getMapInstance();
            int i = 0;
            int i2 = 0;
            for (int i3 = 0; i3 < list.size(); i3++) {
                ExecutorSummary executorSummary = (ExecutorSummary) list.get(i3);
                if (executorSummary.get_uptime_secs() > 1) {
                    i2++;
                }
                String str = executorSummary.get_component_id();
                ExecutorStats executorStats = executorSummary.get_stats();
                if (null != executorStats) {
                    SystemState.PipelineNodeSystemPart nodePart = SystemState.getNodePart(nameMapping, pipelineSystemPart, str);
                    PipelineElementStatistics pipelineElementStatistics = mapInstance.get(nodePart.getName());
                    if (null == pipelineElementStatistics) {
                        pipelineElementStatistics = PipelineElementStatistics.getInstance();
                        mapInstance.put(str, pipelineElementStatistics);
                    }
                    pipelineElementStatistics.addExecutor();
                    double min = Math.min(topologyInfo.get_uptime_secs(), 600);
                    if (executorStats.get_specific().is_set_bolt()) {
                        handleBolt(str, executorStats, pipelineElementStatistics, min, nodePart);
                        i++;
                    } else if (executorStats.get_specific().is_set_spout()) {
                        handleSpout(str, executorStats, pipelineElementStatistics, min, nodePart);
                    } else {
                        handleOther(str, executorStats, pipelineElementStatistics, min, nodePart);
                    }
                }
            }
            boolean z = false;
            if ((PipelineLifecycleEvent.Status.UNKNOWN == pipelineSystemPart.getStatus() || PipelineLifecycleEvent.Status.STARTING == pipelineSystemPart.getStatus()) && list.size() == i2) {
                pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.CREATED, true);
                z = true;
            }
            boolean processStatistics = processStatistics(nameMapping, mapInstance, i, pipelineSystemPart);
            if (!z && processStatistics && PipelineLifecycleEvent.Status.CREATED == pipelineSystemPart.getStatus()) {
                pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.INITIALIZED, true);
            }
        } else {
            LOGGER.error("no mapping for " + topologyInfo.get_name());
        }
        return pipelineSystemPart;
    }

    private void handleBolt(String str, ExecutorStats executorStats, PipelineElementStatistics pipelineElementStatistics, double d, SystemState.PipelineNodeSystemPart pipelineNodeSystemPart) {
        BoltStats boltStats = executorStats.get_specific().get_bolt();
        pipelineElementStatistics.addLatency(str, getDoubleStatValue(boltStats.get_process_ms_avg(), ALL_TIME));
        long longStatValue = getLongStatValue(boltStats.get_executed(), AT_10M);
        double doubleStatValue = getDoubleStatValue(boltStats.get_execute_ms_avg(), AT_10M);
        if (isZero(doubleStatValue, 5.0E-5d)) {
            doubleStatValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY);
        }
        pipelineElementStatistics.addCapacity(str, (longStatValue * doubleStatValue) / (1000.0d * d));
        pipelineElementStatistics.addThroughputItems(str, getLongStatValue(boltStats.get_acked(), ALL_TIME));
    }

    private boolean isZero(double d, double d2) {
        return d >= (-d2) && d <= d2;
    }

    private void handleSpout(String str, ExecutorStats executorStats, PipelineElementStatistics pipelineElementStatistics, double d, SystemState.PipelineNodeSystemPart pipelineNodeSystemPart) {
        long lastUpdate = pipelineNodeSystemPart.getLastUpdate(TimeBehavior.LATENCY);
        if (lastUpdate > 0) {
            d = System.currentTimeMillis() - lastUpdate;
        }
        double observedValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY);
        double observedValue2 = pipelineNodeSystemPart.getObservedValue(TimeBehavior.THROUGHPUT_ITEMS);
        if (observedValue2 > 0.0d) {
            pipelineElementStatistics.addCapacity(str, (observedValue2 * observedValue) / (1000.0d * d));
        }
        pipelineElementStatistics.addThroughputItems(str, (long) observedValue2);
    }

    private void handleOther(String str, ExecutorStats executorStats, PipelineElementStatistics pipelineElementStatistics, double d, SystemState.PipelineNodeSystemPart pipelineNodeSystemPart) {
        double observedValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY);
        long longStatValue = getLongStatValue(executorStats.get_emitted(), AT_10M);
        pipelineElementStatistics.addCapacity(str, (longStatValue * observedValue) / (1000.0d * d));
        pipelineElementStatistics.addThroughputItems(str, longStatValue);
    }

    private boolean processStatistics(INameMapping iNameMapping, Map<String, PipelineElementStatistics> map, int i, SystemState.PipelineSystemPart pipelineSystemPart) {
        boolean z = false;
        boolean z2 = PipelineLifecycleEvent.Status.STARTED == pipelineSystemPart.getStatus();
        if (!map.isEmpty()) {
            long j = 0;
            double d = 0.0d;
            double d2 = 0.0d;
            int i2 = 0;
            int i3 = 0;
            int i4 = 0;
            for (Map.Entry<String, PipelineElementStatistics> entry : map.entrySet()) {
                String key = entry.getKey();
                PipelineElementStatistics value = entry.getValue();
                double throughputItemsAvg = value.getThroughputItemsAvg();
                double latencyAvg = value.getLatencyAvg();
                double capacityAvg = value.getCapacityAvg();
                int numExecutors = value.getNumExecutors();
                if (!key.startsWith("__")) {
                    SystemState.PipelineNodeSystemPart nodePart = SystemState.getNodePart(iNameMapping, pipelineSystemPart, key);
                    i3++;
                    if (null != nodePart) {
                        setPartPositiveValue(nodePart, TimeBehavior.THROUGHPUT_ITEMS, throughputItemsAvg);
                        setPartPositiveValue(nodePart, Scalability.ITEMS, throughputItemsAvg);
                        setPartPositiveValue(nodePart, TimeBehavior.LATENCY, latencyAvg);
                        setPartPositiveValue(nodePart, ResourceUsage.CAPACITY, capacityAvg);
                        setPartPositiveValue(nodePart, ResourceUsage.EXECUTORS, numExecutors);
                        sendDemoEvent(nodePart, pipelineSystemPart.getName(), 4);
                        if (!z2 && nodePart.isInitialized()) {
                            i4++;
                        }
                    } else {
                        LOGGER.error("no mapping for executor " + key);
                    }
                }
                j = (long) (j + throughputItemsAvg);
                d += latencyAvg;
                d2 += capacityAvg;
                i2 += numExecutors;
                PipelineElementStatistics.releaseInstance(value);
            }
            z = z2 || (i3 > 0 && i3 == i4);
            setPartPositiveValue(pipelineSystemPart, TimeBehavior.THROUGHPUT_ITEMS, j);
            setPartPositiveValue(pipelineSystemPart, Scalability.ITEMS, j);
            setPartPositiveValue(pipelineSystemPart, TimeBehavior.LATENCY, d);
            setPartPositiveValue(pipelineSystemPart, ResourceUsage.CAPACITY, d2 / i);
            setPartPositiveValue(pipelineSystemPart, ResourceUsage.EXECUTORS, i2);
            sendDemoEvent(pipelineSystemPart, null, 2);
        }
        PipelineElementStatistics.release(map, false);
        if (!z && !MonitoringManager.hasAdaptationModel()) {
            z = true;
        }
        return z;
    }

    private static void setPartPositiveValue(SystemState.SystemPart systemPart, IObservable iObservable, double d) {
        if (d > 0.0d) {
            systemPart.setValue(iObservable, d, (Object) null);
        }
    }

    public static <T> double getDoubleStatValue(Map<String, Map<T, Double>> map, String str) {
        Double value;
        double d = 0.0d;
        Map<T, Double> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Double> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    d += value.doubleValue();
                }
            }
        }
        return d;
    }

    public static <T> long getLongStatValue(Map<String, Map<T, Long>> map, String str) {
        Long value;
        long j = 0;
        Map<T, Long> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Long> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    j += value.longValue();
                }
            }
        }
        return j;
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask, java.util.TimerTask
    public boolean cancel() {
        return super.cancel();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public int getFrequency() {
        return Configuration.getPipelineMonitoringFrequency();
    }
}
