package eu.qualimaster.monitoring.storm;

import backtype.storm.generated.BoltStats;
import backtype.storm.generated.ExecutorInfo;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import eu.qualimaster.Configuration;
import eu.qualimaster.adaptation.events.AdaptationEvent;
import eu.qualimaster.coordination.HostPort;
import eu.qualimaster.coordination.INameMapping;
import eu.qualimaster.coordination.TaskAssignment;
import eu.qualimaster.coordination.ZkUtils;
import eu.qualimaster.dataManagement.DataManager;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import eu.qualimaster.monitoring.AbstractContainerMonitoringTask;
import eu.qualimaster.monitoring.MonitoringManager;
import eu.qualimaster.monitoring.events.ComponentKey;
import eu.qualimaster.monitoring.systemState.PipelineNodeSystemPart;
import eu.qualimaster.monitoring.systemState.PipelineSystemPart;
import eu.qualimaster.monitoring.systemState.SystemPart;
import eu.qualimaster.monitoring.systemState.SystemState;
import eu.qualimaster.observables.IObservable;
import eu.qualimaster.observables.ResourceUsage;
import eu.qualimaster.observables.TimeBehavior;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.storm.curator.framework.CuratorFramework;
import org.apache.thrift7.TException;

/* loaded from: input_file:eu/qualimaster/monitoring/storm/ThriftMonitoringTask.class */
class ThriftMonitoringTask extends AbstractContainerMonitoringTask {
    static final String ALL_TIME = ":all-time";
    static final String AT_10M = "600";
    static final String AT_3H = "10800";
    static final String AT_1D = "86400";
    private static final Logger LOGGER = LogManager.getLogger(ThriftMonitoringTask.class);
    private static final int EXECUTOR_START_WAITING_TIME = Configuration.getStormExecutorStartupWaitingTime();
    private String pipeline;
    private Set<String> topologyNames;
    private StormConnection connection;
    private Class<? extends AdaptationEvent> adaptationFilter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThriftMonitoringTask(String str, StormConnection stormConnection, SystemState systemState, Class<? extends AdaptationEvent> cls) {
        super(systemState);
        this.topologyNames = new HashSet();
        this.pipeline = str;
        this.connection = stormConnection;
        this.adaptationFilter = cls;
        INameMapping nameMapping = MonitoringManager.getNameMapping(str);
        if (null != nameMapping) {
            this.topologyNames.addAll(nameMapping.getPipelineNames());
        } else {
            LOGGER.error("no name mapping for pipeline " + str);
        }
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public void monitor() {
        if (this.connection.open()) {
            try {
                List list = this.connection.getClusterSummary().get_topologies();
                HashSet hashSet = new HashSet();
                for (int i = 0; i < list.size(); i++) {
                    TopologySummary topologySummary = (TopologySummary) list.get(i);
                    if (this.topologyNames.contains(topologySummary.get_name())) {
                        try {
                            PipelineSystemPart aggregateTopology = aggregateTopology(this.connection.getTopologyInfo(topologySummary.get_id()));
                            if (null != aggregateTopology) {
                                hashSet.add(aggregateTopology);
                            }
                        } catch (NotAliveException e) {
                        }
                    }
                }
                for (PipelineSystemPart pipelineSystemPart : getState().getPipelines()) {
                    if (!hashSet.contains(pipelineSystemPart) && (PipelineLifecycleEvent.Status.STOPPED == pipelineSystemPart.getStatus() || PipelineLifecycleEvent.Status.STOPPING == pipelineSystemPart.getStatus())) {
                        pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.DISAPPEARED, true, this.adaptationFilter);
                    }
                }
            } catch (TException e2) {
                LOGGER.error("Cannot obtain thrift data " + e2.getMessage(), e2);
            } catch (IllegalStateException e3) {
            }
        }
    }

    private void checkAssignment(TopologyInfo topologyInfo) {
        CuratorFramework curator = this.connection.getCurator();
        if (null != curator) {
            try {
                System.out.println("ALIVE " + HostPort.toHostPort(ZkUtils.getAliveWorkers(curator, topologyInfo), HostPort.WORKERBEATS_HOSTPORT_PARSER));
                System.out.println("ASSNG " + TaskAssignment.readTaskAssignments(ZkUtils.getAssignment(curator, topologyInfo), ZkUtils.taskComponentMapping(topologyInfo)));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private boolean isSink(INameMapping iNameMapping, String str) {
        INameMapping.Component pipelineNodeComponent = iNameMapping.getPipelineNodeComponent(str);
        return null != pipelineNodeComponent && INameMapping.Component.Type.SINK == pipelineNodeComponent.getType();
    }

    private PipelineSystemPart preparePipelineAggregation(TopologyInfo topologyInfo, INameMapping iNameMapping) throws TException, NotAliveException {
        PipelineSystemPart obtainPipeline = getState().obtainPipeline(iNameMapping.getPipelineName());
        if (null == obtainPipeline.getTopology()) {
            obtainPipeline.setTopology(Utils.buildPipelineTopology(this.connection.getTopology(topologyInfo), topologyInfo, iNameMapping));
        }
        if (PipelineLifecycleEvent.Status.INITIALIZED == obtainPipeline.getStatus() && !DataManager.isStarted() && System.currentTimeMillis() - obtainPipeline.getLastStateChange() > Configuration.getPipelineStartNotificationDelay()) {
            obtainPipeline.changeStatus(PipelineLifecycleEvent.Status.STARTED, true, this.adaptationFilter);
        }
        return obtainPipeline;
    }

    private PipelineSystemPart aggregateTopology(TopologyInfo topologyInfo) throws TException, NotAliveException {
        PipelineSystemPart pipelineSystemPart = null;
        INameMapping nameMapping = MonitoringManager.getNameMapping(this.pipeline);
        if (null != nameMapping) {
            pipelineSystemPart = preparePipelineAggregation(topologyInfo, nameMapping);
            List list = topologyInfo.get_executors();
            PipelineStatistics pipelineStatistics = new PipelineStatistics(pipelineSystemPart);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            int i = 0;
            int i2 = 0;
            int i3 = 0;
            for (int i4 = 0; i4 < list.size(); i4++) {
                ExecutorSummary executorSummary = (ExecutorSummary) list.get(i4);
                String str = executorSummary.get_component_id();
                if (executorSummary.get_uptime_secs() > EXECUTOR_START_WAITING_TIME) {
                    i++;
                    arrayList.add(str);
                }
                boolean isInternal = Utils.isInternal(executorSummary);
                ExecutorStats executorStats = executorSummary.get_stats();
                PipelineNodeSystemPart nodePart = SystemState.getNodePart(nameMapping, pipelineSystemPart, str);
                if (!isInternal) {
                    i2++;
                    if (nodePart.getObservedValue(ResourceUsage.TASKS) > 0.0d) {
                        i3++;
                        arrayList2.add(str);
                    }
                }
                if (null != executorStats) {
                    if (isInternal) {
                    }
                    double min = Math.min(topologyInfo.get_uptime_secs(), 600);
                    if (executorStats.get_specific().is_set_bolt()) {
                        handleBolt(executorSummary, min, nodePart, isInternal);
                    } else if (executorStats.get_specific().is_set_spout()) {
                        handleSpout(executorSummary, min, nodePart, isInternal);
                    } else {
                        handleOther(executorSummary, min, nodePart, isInternal);
                    }
                    if (!isInternal) {
                        sendSummaryEvent(nodePart, pipelineSystemPart.getName(), 4);
                    }
                    pipelineStatistics.collect(nodePart);
                }
            }
            boolean commit = pipelineStatistics.commit();
            sendSummaryEvent(pipelineSystemPart, null, 2);
            boolean z = false;
            if (PipelineLifecycleEvent.Status.UNKNOWN == pipelineSystemPart.getStatus() || PipelineLifecycleEvent.Status.STARTING == pipelineSystemPart.getStatus()) {
                LOGGER.info("Trying to elevate '" + pipelineSystemPart.getName() + "'to CREATED: uptime " + arrayList + " " + list.size() + " " + i + " event received " + arrayList2 + " " + i2 + " " + i3);
                if (list.size() == i || i2 == i3) {
                    pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.CREATED, true, this.adaptationFilter);
                    z = true;
                }
            }
            if (!z && commit && PipelineLifecycleEvent.Status.CREATED == pipelineSystemPart.getStatus()) {
                pipelineSystemPart.changeStatus(PipelineLifecycleEvent.Status.INITIALIZED, true, this.adaptationFilter);
            }
        } else {
            LOGGER.error("no mapping for " + topologyInfo.get_name());
        }
        return pipelineSystemPart;
    }

    private static int getTaskCount(ExecutorSummary executorSummary) {
        ExecutorInfo executorInfo = executorSummary.get_executor_info();
        return (executorInfo.get_task_end() - executorInfo.get_task_start()) + 1;
    }

    private void handleBolt(ExecutorSummary executorSummary, double d, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        if (pipelineNodeSystemPart.useThrift()) {
            List<ComponentKey> keys = toKeys(executorSummary);
            BoltStats boltStats = executorSummary.get_stats().get_specific().get_bolt();
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.EXECUTORS, 1.0d);
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.TASKS, getTaskCount(executorSummary));
            if (z) {
                return;
            }
            setValue(pipelineNodeSystemPart, TimeBehavior.LATENCY, getDoubleStatValue(boltStats.get_process_ms_avg(), ALL_TIME), keys);
            long longStatValue = getLongStatValue(boltStats.get_executed(), AT_10M);
            double doubleStatValue = getDoubleStatValue(boltStats.get_execute_ms_avg(), AT_10M);
            if (isZero(doubleStatValue, 5.0E-5d)) {
                doubleStatValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY);
            }
            setValue(pipelineNodeSystemPart, ResourceUsage.CAPACITY, (longStatValue * doubleStatValue) / (1000.0d * d), keys);
            setValue(pipelineNodeSystemPart, TimeBehavior.THROUGHPUT_ITEMS, getLongStatValue(boltStats.get_acked(), ALL_TIME), keys);
        }
    }

    private void setValueToKeysAndClear(PipelineNodeSystemPart pipelineNodeSystemPart, List<ComponentKey> list, IObservable iObservable, double d) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(pipelineNodeSystemPart.getComponentKeys(iObservable));
        setValue(pipelineNodeSystemPart, iObservable, 1.0d, list);
        hashSet.remove(list);
        pipelineNodeSystemPart.clearComponents(iObservable, hashSet);
    }

    private List<ComponentKey> toKeys(ExecutorSummary executorSummary) {
        ArrayList arrayList = new ArrayList();
        ExecutorInfo executorInfo = executorSummary.get_executor_info();
        for (int i = executorInfo.get_task_start(); i <= executorInfo.get_task_end(); i++) {
            arrayList.add(new ComponentKey(executorSummary.get_host(), executorSummary.get_port(), i));
        }
        return arrayList;
    }

    private void setValue(SystemPart systemPart, IObservable iObservable, double d, List<ComponentKey> list) {
        for (int i = 0; i < list.size(); i++) {
            systemPart.setValue(iObservable, d, list.get(i));
        }
    }

    private boolean isZero(double d, double d2) {
        return d >= (-d2) && d <= d2;
    }

    private void handleSpout(ExecutorSummary executorSummary, double d, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        if (pipelineNodeSystemPart.useThrift()) {
            List<ComponentKey> keys = toKeys(executorSummary);
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.EXECUTORS, 1.0d);
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.TASKS, getTaskCount(executorSummary));
        }
    }

    private void handleOther(ExecutorSummary executorSummary, double d, PipelineNodeSystemPart pipelineNodeSystemPart, boolean z) {
        if (pipelineNodeSystemPart.useThrift() || z) {
            List<ComponentKey> keys = toKeys(executorSummary);
            ExecutorStats executorStats = executorSummary.get_stats();
            double observedValue = pipelineNodeSystemPart.getObservedValue(TimeBehavior.LATENCY);
            if (!z) {
                long longStatValue = getLongStatValue(executorStats.get_emitted(), AT_10M);
                pipelineNodeSystemPart.setValue((IObservable) ResourceUsage.CAPACITY, (longStatValue * observedValue) / (1000.0d * d), (Object) null);
                pipelineNodeSystemPart.setValue((IObservable) TimeBehavior.THROUGHPUT_ITEMS, longStatValue, (Object) null);
            }
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.EXECUTORS, 1.0d);
            setValueToKeysAndClear(pipelineNodeSystemPart, keys, ResourceUsage.TASKS, getTaskCount(executorSummary));
        }
    }

    public static <T> double getDoubleStatValue(Map<String, Map<T, Double>> map, String str) {
        Double value;
        double d = 0.0d;
        Map<T, Double> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Double> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    d += value.doubleValue();
                }
            }
        }
        return d;
    }

    public static <T> long getLongStatValue(Map<String, Map<T, Long>> map, String str) {
        Long value;
        long j = 0;
        Map<T, Long> map2 = map.get(str);
        if (null != map2) {
            for (Map.Entry<T, Long> entry : map2.entrySet()) {
                T key = entry.getKey();
                if (null != key && !key.toString().startsWith("__") && null != (value = entry.getValue())) {
                    j += value.longValue();
                }
            }
        }
        return j;
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask, java.util.TimerTask
    public boolean cancel() {
        return super.cancel();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    public int getFrequency() {
        return Configuration.getPipelineMonitoringFrequency();
    }

    @Override // eu.qualimaster.monitoring.AbstractMonitoringTask
    protected void failover(Throwable th) {
        this.connection.failover(th);
    }
}
