package eu.qualimaster.common.signal;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import eu.qualimaster.Configuration;
import eu.qualimaster.common.monitoring.MonitoringPluginRegistry;
import eu.qualimaster.common.shedding.LoadShedder;
import eu.qualimaster.common.shedding.LoadShedderFactory;
import eu.qualimaster.common.shedding.NoShedder;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.monitoring.events.ComponentKeyRegistry;
import eu.qualimaster.monitoring.events.LoadSheddingChangedMonitoringEvent;
import eu.qualimaster.observables.IObservable;
import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.storm.curator.framework.CuratorFramework;
import org.apache.storm.curator.framework.state.ConnectionState;
import org.apache.storm.curator.framework.state.ConnectionStateListener;

/* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/signal/BaseSignalSpout.class */
public abstract class BaseSignalSpout extends BaseRichSpout implements SignalListener, IParameterChangeListener, IShutdownListener, ILoadSheddingListener, IMonitoringChangeListener, IAlgorithmChangeListener {
    private String name;
    private String pipeline;
    private boolean sendRegular;
    private String interconnPorts;
    private LoadShedder<?> shedder;
    private transient StormSignalConnection signalConnection;
    private transient AlgorithmChangeEventHandler algorithmEventHandler;
    private transient ParameterChangeEventHandler parameterEventHandler;
    private transient ShutdownEventHandler shutdownEventHandler;
    private transient Monitor monitor;
    private transient PortManager portManager;
    private transient boolean signalConnInitialized;
    private transient boolean initMonitorOnSignalConnInit;

    public BaseSignalSpout(String str, String str2) {
        this(str, str2, false);
    }

    public BaseSignalSpout(String str, String str2, boolean z) {
        this.shedder = NoShedder.INSTANCE;
        this.name = str;
        this.pipeline = str2;
        this.sendRegular = z;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        getLogger().info("Prepare--basesignalspout.... " + this.pipeline + "/" + this.name);
        StormSignalConnection.configureEventBus(map);
        this.interconnPorts = BaseSignalBolt.getInterconnPorts(map);
        if (map.containsKey(Constants.CONFIG_KEY_SUBPIPELINE_NAME)) {
            this.pipeline = (String) map.get(Constants.CONFIG_KEY_SUBPIPELINE_NAME);
        }
        this.monitor = createMonitor(this.pipeline, this.name, true, topologyContext);
        EventManager.asyncSend(new ConnectTaskMonitoringEvent(this.pipeline, this.name, this.monitor.getComponentKey()));
        if (initMonitorDuringOpen()) {
            this.monitor.init(this.sendRegular);
        }
        topologyContext.addTaskHook(this.monitor);
        try {
            this.signalConnection = new StormSignalConnection(this.name, this, this.pipeline, map);
            if (Configuration.getPipelineSignalsQmEvents()) {
                this.algorithmEventHandler = AlgorithmChangeEventHandler.createAndRegister(this, this.pipeline, this.name);
                this.parameterEventHandler = ParameterChangeEventHandler.createAndRegister(this, this.pipeline, this.name);
                this.shutdownEventHandler = ShutdownEventHandler.createAndRegister(this, this.pipeline, this.name);
                this.signalConnInitialized = true;
            } else {
                this.signalConnection.init(new ConnectionStateListener() { // from class: eu.qualimaster.common.signal.BaseSignalSpout.1
                    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                        BaseSignalSpout.this.getLogger().info("Curator state changed " + connectionState + " " + BaseSignalSpout.this.pipeline + "/" + BaseSignalSpout.this.name + "  sigConInit " + BaseSignalSpout.this.signalConnInitialized + " initMonOnSigConn " + BaseSignalSpout.this.initMonitorOnSignalConnInit);
                        if (ConnectionState.CONNECTED == connectionState) {
                            BaseSignalSpout.this.signalConnInitialized = true;
                            if (BaseSignalSpout.this.initMonitorOnSignalConnInit) {
                                BaseSignalSpout.this.monitor.init(BaseSignalSpout.this.sendRegular);
                            }
                        }
                    }
                });
                this.portManager = BaseSignalBolt.createPortManager(this.signalConnection, this.interconnPorts);
            }
        } catch (Exception e) {
            getLogger().error("Error SignalConnection:" + e.getMessage(), e);
        }
        ComponentKeyRegistry.register(this.pipeline, this, this.monitor.getComponentKey());
        getLogger().info("Prepared--basesignalspout.... " + this.pipeline + "/" + this.name);
    }

    protected Monitor createMonitor(String str, String str2, boolean z, TopologyContext topologyContext) {
        return new Monitor(str, str2, true, topologyContext);
    }

    public void recordOnce(IObservable iObservable, double d) {
        this.monitor.recordOnce(iObservable, d);
    }

    public void recordOnce(Map<IObservable, Double> map) {
        this.monitor.recordOnce(map);
    }

    protected boolean initMonitorDuringOpen() {
        return true;
    }

    protected final void initMonitor() {
        getLogger().info("Init monitor " + this.pipeline + "/" + this.name + " sigConInit " + this.signalConnInitialized + " initMonOnSigConn " + this.initMonitorOnSignalConnInit);
        if (this.signalConnInitialized) {
            this.monitor.init(this.sendRegular);
        } else {
            this.initMonitorOnSignalConnInit = true;
        }
    }

    protected PortManager getPortManager() {
        if (null == this.portManager) {
            this.portManager = BaseSignalBolt.createPortManager(this.signalConnection, this.interconnPorts);
        }
        return this.portManager;
    }

    protected void startMonitoring() {
        this.monitor.startMonitoring();
        MonitoringPluginRegistry.startMonitoring();
    }

    protected ThreadMonitor createThreadMonitor() {
        return this.monitor.createThreadMonitor();
    }

    protected void endMonitoring() {
        this.monitor.endMonitoring();
        MonitoringPluginRegistry.endMonitoring();
    }

    @Deprecated
    protected void aggregateExecutionTime(long j) {
        this.monitor.aggregateExecutionTime(j);
    }

    @Deprecated
    protected void aggregateExecutionTime(long j, int i) {
        this.monitor.aggregateExecutionTime(j, i);
    }

    protected void sendAlgorithmChangedEvent(String str) {
        this.signalConnection.sendAlgorithmChangedEvent(str);
    }

    @Override // eu.qualimaster.common.signal.SignalListener
    public void onSignal(byte[] bArr) {
        getLogger().info("onSignal: Listening on the signal! " + this.pipeline + "/" + this.name);
        boolean notify = ParameterChangeSignal.notify(bArr, this.pipeline, this.name, this);
        if (!notify) {
            notify = AlgorithmChangeSignal.notify(bArr, this.pipeline, this.name, this);
        }
        if (!notify) {
            notify = ShutdownSignal.notify(bArr, this.pipeline, this.name, this);
        }
        if (notify) {
            return;
        }
        LoadSheddingSignal.notify(bArr, this.pipeline, this.name, this);
    }

    @Override // eu.qualimaster.common.signal.IAlgorithmChangeListener
    public void notifyAlgorithmChange(AlgorithmChangeSignal algorithmChangeSignal) {
        ParameterChangeSignal parameterChange = algorithmChangeSignal.toParameterChange();
        if (null != parameterChange) {
            notifyParameterChange(parameterChange);
        }
    }

    @Override // eu.qualimaster.common.signal.IParameterChangeListener
    public void notifyParameterChange(ParameterChangeSignal parameterChangeSignal) {
        getLogger().info("This notifyParameterChange is being called!");
    }

    protected void sendSignal(TopologySignal topologySignal) throws SignalException {
        topologySignal.sendSignal(this.signalConnection);
    }

    protected void installNamespaceLifecycleSignalHandler() {
        SignalNamespaceLifecycleEventHandler.registerHandler();
    }

    @Override // eu.qualimaster.common.signal.IShutdownListener
    public final void notifyShutdown(ShutdownSignal shutdownSignal) {
        if (null != this.portManager) {
            this.portManager.close();
        }
        this.signalConnection.close();
        prepareShutdown(shutdownSignal);
        ComponentKeyRegistry.unregister(this);
        this.monitor.shutdown();
        SignalNamespaceLifecycleEventHandler.unregisterHandler();
        if (Configuration.getPipelineSignalsQmEvents()) {
            EventManager.unregister(this.algorithmEventHandler);
            EventManager.unregister(this.parameterEventHandler);
            EventManager.unregister(this.shutdownEventHandler);
        }
        EventManager.stop();
    }

    protected void prepareShutdown(ShutdownSignal shutdownSignal) {
    }

    @Override // eu.qualimaster.common.signal.ILoadSheddingListener
    public final void notifyLoadShedding(LoadSheddingSignal loadSheddingSignal) {
        this.shedder = LoadShedderFactory.createShedder(loadSheddingSignal.getShedder());
        this.shedder.configure(loadSheddingSignal);
        EventManager.send(new LoadSheddingChangedMonitoringEvent(this.pipeline, this.name, loadSheddingSignal.getShedder(), this.shedder.getDescriptor().getIdentifier(), loadSheddingSignal.getCauseMessageId()));
    }

    @Override // eu.qualimaster.common.signal.IMonitoringChangeListener
    public final void notifyMonitoringChange(MonitoringChangeSignal monitoringChangeSignal) {
        this.monitor.notifyMonitoringChange(monitoringChangeSignal);
    }

    protected LoadShedder<?> getShedder() {
        return this.shedder;
    }

    protected boolean isEnabled(Object obj) {
        return this.shedder.isEnabled(obj);
    }

    public String getName() {
        return this.name;
    }

    @Deprecated
    public String getNamespace() {
        return this.pipeline;
    }

    public String getPipeline() {
        return this.pipeline;
    }

    public void nextTuple() {
        startMonitoring();
        doNextTuple();
        endMonitoring();
    }

    protected void doNextTuple() {
    }

    protected Logger getLogger() {
        return Logger.getLogger(getClass());
    }

    public StormSignalConnection getSignalConnection() {
        return this.signalConnection;
    }
}
