package eu.qualimaster.monitoring.storm;

import backtype.storm.event.EventManager;
import backtype.storm.generated.BoltStats;
import backtype.storm.generated.ExecutorInfo;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import eu.qualimaster.coordination.HostPort;
import eu.qualimaster.coordination.INameMapping;
import eu.qualimaster.coordination.TaskAssignment;
import eu.qualimaster.coordination.ZkUtils;
import eu.qualimaster.dataManagement.DataManager;
import eu.qualimaster.infrastructure.InitializationMode;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import eu.qualimaster.monitoring.AbstractContainerMonitoringTask;
import eu.qualimaster.monitoring.MonitoringConfiguration;
import eu.qualimaster.monitoring.MonitoringManager;
import eu.qualimaster.monitoring.events.ComponentKey;
import eu.qualimaster.monitoring.observations.ObservedValue;
import eu.qualimaster.monitoring.systemState.PipelineNodeSystemPart;
import eu.qualimaster.monitoring.systemState.PipelineSystemPart;
import eu.qualimaster.monitoring.systemState.StateUtils;
import eu.qualimaster.monitoring.systemState.SystemPart;
import eu.qualimaster.monitoring.systemState.SystemState;
import eu.qualimaster.monitoring.topology.PipelineTopology;
import eu.qualimaster.observables.IObservable;
import eu.qualimaster.observables.ResourceUsage;
import eu.qualimaster.observables.TimeBehavior;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.storm.curator.framework.CuratorFramework;
import org.apache.thrift7.TException;

/* loaded from: input_file:MonitoringLayer.jar:eu/qualimaster/monitoring/storm/ThriftMonitoringTask.class */
public class ThriftMonitoringTask extends AbstractContainerMonitoringTask {
    static final String ALL_TIME = ":all-time";
    static final String AT_10M = "600";
    static final String AT_3H = "10800";
    static final String AT_1D = "86400";
    private static final Logger LOGGER = LogManager.getLogger(ThriftMonitoringTask.class);
    private String pipeline;
    private Set<String> topologyNames;
    private StormConnection connection;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThriftMonitoringTask(String str, StormConnection stormConnection, SystemState systemState) {
        super(systemState);
        this.topologyNames = new HashSet();
        this.pipeline = str;
        this.connection = stormConnection;
        INameMapping nameMapping = MonitoringManager.getNameMapping(str);
        if (null != nameMapping) {
            this.topologyNames.addAll(nameMapping.getPipelineNames());
        } else {
            LOGGER.error("no name mapping for pipeline " + str);
        }
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public void monitor() {
        if (this.connection.open()) {
            try {
                List list = this.connection.getClusterSummary().get_topologies();
                HashSet hashSet = new HashSet();
                for (int i = 0; i < list.size(); i++) {
                    TopologySummary topologySummary = (TopologySummary) list.get(i);
                    if (this.topologyNames.contains(topologySummary.get_name())) {
                        try {
                            PipelineSystemPart aggregateTopology = aggregateTopology(this.connection.getTopologyInfo(topologySummary.get_id()));
                            if (null != aggregateTopology) {
                                hashSet.add(aggregateTopology);
                            }
                        } catch (NotAliveException e) {
                        }
                    }
                }
            } catch (TException e2) {
                LOGGER.error("Cannot obtain thrift data " + e2.getMessage(), e2);
            } catch (IllegalStateException e3) {
            }
        }
    }

    private void checkAssignment(TopologyInfo topologyInfo) {
        CuratorFramework curator = this.connection.getCurator();
        if (null != curator) {
            try {
                System.out.println("ALIVE " + HostPort.toHostPort(ZkUtils.getAliveWorkers(curator, topologyInfo), HostPort.WORKERBEATS_HOSTPORT_PARSER));
                System.out.println("ASSNG " + TaskAssignment.readTaskAssignments(ZkUtils.getAssignment(curator, topologyInfo), ZkUtils.taskComponentMapping(topologyInfo)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private boolean isSink(INameMapping iNameMapping, String str) {
        INameMapping.Component pipelineNodeComponent = iNameMapping.getPipelineNodeComponent(str);
        return null != pipelineNodeComponent && INameMapping.Component.Type.SINK == pipelineNodeComponent.getType();
    }

    private PipelineSystemPart preparePipelineAggregation(TopologyInfo topologyInfo, INameMapping iNameMapping) throws TException, NotAliveException {
        SystemState state = getState();
        String pipelineName = iNameMapping.getPipelineName();
        PipelineSystemPart obtainPipeline = state.obtainPipeline(pipelineName);
        if (null == obtainPipeline.getTopology()) {
            MonitoringManager.PipelineInfo pipelineInfo = MonitoringManager.getPipelineInfo(pipelineName);
            HashMap hashMap = new HashMap();
            hashMap.put(this.connection.getTopology(topologyInfo), topologyInfo);
            Iterator<MonitoringManager.PipelineInfo> it = pipelineInfo.getSubPipelines().iterator();
            while (it.hasNext()) {
                String name = it.next().getName();
                try {
                    TopologyInfo topologyInfoByName = this.connection.getTopologyInfoByName(name);
                    hashMap.put(this.connection.getTopology(topologyInfoByName), topologyInfoByName);
                } catch (NotAliveException e) {
                    LOGGER.info("Sub-topology not alive: " + name);
                }
            }
            PipelineTopology buildPipelineTopology = Utils.buildPipelineTopology(hashMap, iNameMapping);
            obtainPipeline.setTopology(buildPipelineTopology);
            if (null != buildPipelineTopology) {
                LOGGER.info("TOPOLOGY for " + iNameMapping.getPipelineName() + " " + buildPipelineTopology);
            }
        }
        if (PipelineLifecycleEvent.Status.INITIALIZED == obtainPipeline.getStatus() && !DataManager.isStarted() && System.currentTimeMillis() - obtainPipeline.getLastStateChange() > MonitoringConfiguration.getPipelineStartNotificationDelay()) {
            obtainPipeline.changeStatus(PipelineLifecycleEvent.Status.STARTED, true);
        }
        return obtainPipeline;
    }

    private static boolean isUp(PipelineNodeSystemPart pipelineNodeSystemPart, ExecutorSummary executorSummary) {
        boolean z = false;
        int observedValue = (int) pipelineNodeSystemPart.getObservedValue(ResourceUsage.TASKS);
        if (pipelineNodeSystemPart.getObservedValue(ResourceUsage.TASKS) > 0.0d) {
            InitializationMode initializationMode = MonitoringConfiguration.getInitializationMode();
            if (InitializationMode.DYNAMIC == initializationMode) {
                z = null != pipelineNodeSystemPart.getCurrent();
                if (MonitoringConfiguration.getStormExecutorStartupParallel()) {
                    z &= observedValue >= pipelineNodeSystemPart.getCurrentCount();
                }
            } else if (InitializationMode.ADAPTIVE != initializationMode) {
                z = true;
            } else if (MonitoringConfiguration.getStormExecutorStartupParallel()) {
                z = observedValue >= Utils.taskCount(executorSummary);
            } else {
                z = true;
            }
        }
        return z;
    }

    private static int getExecutorStartWaitingTime() {
        if (InitializationMode.ADAPTIVE == MonitoringConfiguration.getInitializationMode()) {
            return 0;
        }
        return MonitoringConfiguration.getStormExecutorStartupWaitingTime();
    }

    private PipelineSystemPart aggregateTopology(TopologyInfo topologyInfo) throws TException, NotAliveException {
        PipelineSystemPart pipelineSystemPart = null;
        INameMapping nameMapping = MonitoringManager.getNameMapping(this.pipeline);
        int executorStartWaitingTime = getExecutorStartWaitingTime();
        if (null != nameMapping) {
            pipelineSystemPart = preparePipelineAggregation(topologyInfo, nameMapping);
            List<ExecutorSummary> list = topologyInfo.get_executors();
            PipelineStatistics pipelineStatistics = new PipelineStatistics(pipelineSystemPart);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            int i = 0;
            ArrayList arrayList3 = new ArrayList();
            int i2 = 0;
            for (int i3 = 0; i3 < list.size(); i3++) {
                ExecutorSummary executorSummary = list.get(i3);
                String str = executorSummary.get_component_id();
                if (executorStartWaitingTime > 0 && executorSummary.get_uptime_secs() > executorStartWaitingTime) {
                    i++;
                    arrayList.add(str);
                }
                boolean isInternal = Utils.isInternal(executorSummary);
                ExecutorStats executorStats = executorSummary.get_stats();
                PipelineNodeSystemPart check = check(SystemState.getNodePart(nameMapping, pipelineSystemPart, str), isInternal);
                if (!isInternal) {
                    arrayList3.add(str);
                    if (isUp(check, executorSummary)) {
                        i2++;
                        arrayList2.add(str);
                    }
                }
                if (null != executorStats) {
                    if (isInternal) {
                    }
                    if (doThrift(executorSummary, check, isInternal)) {
                        aggregateExecutor(executorSummary, check, isInternal);
                    }
                    if (!isInternal && (check.getParent() instanceof PipelineSystemPart)) {
                        sendSummaryEvent(check, pipelineSystemPart.getName(), 4);
                    }
                }
                pipelineStatistics.collect(check);
            }
            debugExecutors(list, nameMapping, pipelineSystemPart);
            boolean commit = pipelineStatistics.commit();
            sendSummaryEvent(pipelineSystemPart, null, 2);
            boolean z = false;
            if (PipelineLifecycleEvent.Status.UNKNOWN == pipelineSystemPart.getStatus() || PipelineLifecycleEvent.Status.STARTING == pipelineSystemPart.getStatus()) {
                LOGGER.info("Trying to elevate '" + pipelineSystemPart.getName() + "' to CREATED: uptime " + arrayList + " " + list.size() + " " + i + " events expected " + arrayList3 + " received " + arrayList2 + " " + arrayList3.size() + " " + i2);
                if (list.size() == i || arrayList3.size() == i2) {
                    pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.CREATED, true);
                    z = true;
                }
            }
            if (!z && PipelineLifecycleEvent.Status.CREATED == pipelineSystemPart.getStatus()) {
                logElevating(pipelineSystemPart, pipelineStatistics);
                if (commit && areSubpipelinesUp(pipelineSystemPart.getName())) {
                    pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.INITIALIZED, true);
                }
            }
        } else {
            LOGGER.error("no mapping for " + topologyInfo.get_name());
        }
        return pipelineSystemPart;
    }

    private static PipelineNodeSystemPart check(PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        if (z) {
            pipelineNodeSystemPart.markAsInternal();
        }
        return pipelineNodeSystemPart;
    }

    private static void logElevating(PipelineSystemPart pipelineSystemPart, PipelineStatistics pipelineStatistics) {
        LOGGER.info("Trying to elevate '" + pipelineSystemPart.getName() + "' to INITIALIZED: " + pipelineStatistics.getNeedInitializationCount() + " " + pipelineStatistics.getInitializedCount() + " needed " + pipelineStatistics.toStringNeedInitialization() + " init " + pipelineStatistics.toStringInitialized());
    }

    private boolean areSubpipelinesUp(String str) {
        MonitoringManager.PipelineInfo pipelineInfo = MonitoringManager.getPipelineInfo(str);
        boolean z = true;
        if (null != pipelineInfo) {
            for (MonitoringManager.PipelineInfo pipelineInfo2 : pipelineInfo.getSubPipelines()) {
                if (PipelineLifecycleEvent.Status.INITIALIZED != pipelineInfo2.getStatus() && PipelineLifecycleEvent.Status.STARTED != pipelineInfo2.getStatus()) {
                    z = false;
                }
            }
        } else {
            z = true;
        }
        return z;
    }

    private void debugExecutors(List<ExecutorSummary> list, INameMapping iNameMapping, PipelineSystemPart pipelineSystemPart) {
        if (MonitoringConfiguration.debugThriftMonitoring()) {
            LogManager.getLogger(EventManager.class).info("from events " + pipelineSystemPart);
            for (int i = 0; i < list.size(); i++) {
                ExecutorSummary executorSummary = list.get(i);
                boolean isInternal = Utils.isInternal(executorSummary);
                PipelineNodeSystemPart nodePart = SystemState.getNodePart(iNameMapping, pipelineSystemPart, executorSummary.get_component_id());
                if (!doThrift(executorSummary, nodePart, isInternal)) {
                    LogManager.getLogger(EventManager.class).info("from events " + nodePart);
                }
            }
        }
    }

    private static int getTaskCount(ExecutorSummary executorSummary) {
        ExecutorInfo executorInfo = executorSummary.get_executor_info();
        return (executorInfo.get_task_end() - executorInfo.get_task_start()) + 1;
    }

    private void aggregateExecutor(ExecutorSummary executorSummary, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        List<ComponentKey> keys = toKeys(executorSummary);
        ExecutorStats executorStats = executorSummary.get_stats();
        int size = keys.size();
        for (int i = 0; i < size; i++) {
            ComponentKey componentKey = keys.get(i);
            if (executorStats.get_specific().is_set_bolt()) {
                aggregateBolt(executorSummary, pipelineNodeSystemPart, z, componentKey, size);
            } else if (executorStats.get_specific().is_set_spout()) {
                aggregateSpout(executorSummary, pipelineNodeSystemPart, z, componentKey, size);
            } else {
                aggregateOther(executorSummary, pipelineNodeSystemPart, z, componentKey, size);
            }
        }
        setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.EXECUTORS, 1.0d);
        setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.TASKS, getTaskCount(executorSummary));
    }

    private boolean doThrift(ExecutorSummary executorSummary, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        ExecutorStats executorStats;
        boolean useThrift = pipelineNodeSystemPart.useThrift();
        if (useThrift && null != (executorStats = executorSummary.get_stats()) && null != executorStats.get_specific() && !executorStats.get_specific().is_set_bolt() && !executorStats.get_specific().is_set_spout()) {
            useThrift = z;
        }
        return useThrift;
    }

    private void aggregateSpout(ExecutorSummary executorSummary, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z, ComponentKey componentKey, int i) {
        if (MonitoringConfiguration.debugThriftMonitoring()) {
            LogManager.getLogger(EventManager.class).info("from thrift " + pipelineNodeSystemPart.getName() + " capacity " + pipelineNodeSystemPart.getObservedValue(ResourceUsage.CAPACITY, componentKey) + " throughput " + pipelineNodeSystemPart.getObservedValue(TimeBehavior.THROUGHPUT_ITEMS, componentKey) + " key " + componentKey + " -> " + pipelineNodeSystemPart);
        }
    }

    private void aggregateOther(ExecutorSummary executorSummary, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z, ComponentKey componentKey, int i) {
        ExecutorStats executorStats = executorSummary.get_stats();
        ObservedValue observedValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY, componentKey);
        double d = null == observedValue ? 0.0d : observedValue.get();
        if (z) {
            return;
        }
        long longStatValue = getLongStatValue(executorStats.get_emitted(), ALL_TIME);
        StateUtils.setValue(pipelineNodeSystemPart, TimeBehavior.THROUGHPUT_ITEMS, longStatValue / i, componentKey);
        StateUtils.updateCapacity(pipelineNodeSystemPart, componentKey, true);
        if (MonitoringConfiguration.debugThriftMonitoring()) {
            LogManager.getLogger(EventManager.class).info("from thrift " + pipelineNodeSystemPart.getName() + " executedAll " + longStatValue + " execLatency " + d + " capacity " + pipelineNodeSystemPart.getObservedValue(ResourceUsage.CAPACITY, componentKey) + " throughput " + pipelineNodeSystemPart.getObservedValue(TimeBehavior.THROUGHPUT_ITEMS, componentKey) + " key " + componentKey + " -> " + pipelineNodeSystemPart);
        }
    }

    private void aggregateBolt(ExecutorSummary executorSummary, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z, ComponentKey componentKey, int i) {
        double doubleMinStatValue;
        double longStatValue;
        ExecutorStats executorStats = executorSummary.get_stats();
        BoltStats boltStats = executorStats.get_specific().get_bolt();
        if (z) {
            return;
        }
        if (INameMapping.Component.Type.SINK == pipelineNodeSystemPart.getComponentType()) {
            doubleMinStatValue = getDoubleMinStatValue(boltStats.get_process_ms_avg(), AT_10M);
            longStatValue = getLongStatValue(boltStats.get_executed(), AT_10M);
        } else {
            doubleMinStatValue = getDoubleMinStatValue(boltStats.get_process_ms_avg(), AT_10M);
            longStatValue = getLongStatValue(executorStats.get_emitted(), AT_10M);
        }
        StateUtils.setValue(pipelineNodeSystemPart, TimeBehavior.LATENCY, doubleMinStatValue, componentKey);
        StateUtils.setValue(pipelineNodeSystemPart, TimeBehavior.THROUGHPUT_ITEMS, longStatValue / i, componentKey);
        StateUtils.updateCapacity(pipelineNodeSystemPart, componentKey, true);
        if (MonitoringConfiguration.debugThriftMonitoring()) {
            LogManager.getLogger(EventManager.class).info("from thrift " + pipelineNodeSystemPart.getName() + " executed " + longStatValue + boltStats.get_executed() + " execLatency " + doubleMinStatValue + " " + boltStats.get_execute_ms_avg() + " processLatency " + getDoubleStatValue(boltStats.get_process_ms_avg(), AT_10M) + " " + boltStats.get_process_ms_avg() + " capacity " + pipelineNodeSystemPart.getObservedValue(ResourceUsage.CAPACITY, componentKey) + " throughput " + pipelineNodeSystemPart.getObservedValue(TimeBehavior.THROUGHPUT_ITEMS, componentKey) + " key " + componentKey + " -> " + pipelineNodeSystemPart);
        }
    }

    private void setValueToKeysAndClear(PipelineNodeSystemPart pipelineNodeSystemPart, List<ComponentKey> list, IObservable iObservable, double d) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(pipelineNodeSystemPart.getComponentKeys(iObservable));
        setValue(pipelineNodeSystemPart, iObservable, 1.0d, list);
        hashSet.remove(list);
        pipelineNodeSystemPart.clearComponents(iObservable, hashSet);
    }

    private List<ComponentKey> toKeys(ExecutorSummary executorSummary) {
        ArrayList arrayList = new ArrayList();
        ExecutorInfo executorInfo = executorSummary.get_executor_info();
        for (int i = executorInfo.get_task_start(); i <= executorInfo.get_task_end(); i++) {
            arrayList.add(new ComponentKey(executorSummary.get_host(), executorSummary.get_port(), i));
        }
        return arrayList;
    }

    private void setValue(SystemPart systemPart, IObservable iObservable, double d, List<ComponentKey> list) {
        for (int i = 0; i < list.size(); i++) {
            StateUtils.setValue(systemPart, iObservable, d, list.get(i));
        }
    }

    public static <T> double getDoubleStatValue(Map<String, Map<T, Double>> map, String str) {
        Double value;
        double d = 0.0d;
        Map<T, Double> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Double> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    d += value.doubleValue();
                }
            }
        }
        return d;
    }

    public static <T> double getDoubleMinStatValue(Map<String, Map<T, Double>> map, String str) {
        Double value;
        double d = 0.0d;
        Map<T, Double> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Double> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    d = 0 == 0 ? value.doubleValue() : Math.min(value.doubleValue(), d);
                }
            }
        }
        return d;
    }

    public static <T> long getLongStatValue(Map<String, Map<T, Long>> map, String str) {
        Long value;
        long j = 0;
        Map<T, Long> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Long> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    j += value.longValue();
                }
            }
        }
        return j;
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask, java.util.TimerTask
    public boolean cancel() {
        return super.cancel();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public int getFrequency() {
        return MonitoringConfiguration.getPipelineMonitoringFrequency();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    protected void failover(Throwable th) {
        this.connection.failover(th);
    }
}
