package tests.eu.qualimaster.storm;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import eu.qualimaster.common.signal.AggregationKeyProvider;
import eu.qualimaster.common.signal.BaseSignalSourceSpout;
import eu.qualimaster.common.signal.ParameterChangeSignal;
import eu.qualimaster.common.signal.ShutdownSignal;
import eu.qualimaster.common.signal.SourceMonitor;
import eu.qualimaster.dataManagement.DataManager;
import eu.qualimaster.dataManagement.strategies.NoStorageStrategyDescriptor;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.monitoring.events.AlgorithmChangedMonitoringEvent;
import eu.qualimaster.reflection.ReflectionHelper;
import java.util.Map;
import tests.eu.qualimaster.storm.ISrc;

/* loaded from: input_file:tests/eu/qualimaster/storm/Source.class */
public class Source<S extends ISrc> extends BaseSignalSourceSpout {
    private static SignalCollector signals = new SignalCollector(Naming.LOG_SOURCE);
    private Class<S> srcClass;
    private transient SpoutOutputCollector collector;
    private transient S source;

    public Source(Class<S> cls, String str) {
        super("source", str);
        this.srcClass = cls;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public S getSource() {
        return this.source;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.collector = spoutOutputCollector;
        if (Naming.defaultInitializeAlgorithms(map)) {
            try {
                this.source = (S) ReflectionHelper.createInstance(this.srcClass);
                EventManager.send(new AlgorithmChangedMonitoringEvent(getPipeline(), "source", "source"));
                this.source.connect();
            } catch (IllegalAccessException e) {
            } catch (InstantiationException e2) {
            }
        } else {
            this.source = (S) DataManager.DATA_SOURCE_MANAGER.createDataSource(getPipeline(), this.srcClass, NoStorageStrategyDescriptor.INSTANCE);
            if (!DataManager.isStarted()) {
                this.source.connect();
            }
        }
        initializeParams(map, this.source);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeParams(Map map, S s) {
    }

    public void nextTuple() {
        startMonitoring();
        Integer data = this.source.getData();
        if (null == data || !isEnabled(data)) {
            return;
        }
        this.collector.emit(new Values(new Object[]{data}));
        endMonitoring();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"number"}));
    }

    public void notifyParameterChange(ParameterChangeSignal parameterChangeSignal) {
        signals.notifyParameterChange(parameterChangeSignal);
    }

    protected void prepareShutdown(ShutdownSignal shutdownSignal) {
        signals.notifyShutdown(shutdownSignal);
    }

    public SignalCollector getSignals() {
        return signals;
    }

    public void close() {
        if (null != this.source) {
            this.source.disconnect();
        }
        super.close();
    }

    public void configure(SourceMonitor sourceMonitor) {
        sourceMonitor.setAggregationInterval(1000L);
        sourceMonitor.registerAggregationKeyProvider(new AggregationKeyProvider<Number>(Number.class) { // from class: tests.eu.qualimaster.storm.Source.1
            public String getAggregationKey(Number number) {
                return String.valueOf(number.intValue() % 5);
            }
        });
    }
}
