package tests.eu.qualimaster.common.switching;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.common.switching.AbstractSwitchMechanism;
import eu.qualimaster.common.switching.AbstractSwitchStrategy;
import eu.qualimaster.common.switching.BaseSwitchSpout;
import eu.qualimaster.common.switching.IState;
import eu.qualimaster.common.switching.ParallelTrackSwitchMechanism;
import eu.qualimaster.common.switching.SeparateIntermediaryStrategy;
import eu.qualimaster.common.switching.TupleReceiverServer;
import java.io.IOException;
import java.util.Map;
import org.apache.log4j.Logger;
import tests.eu.qualimaster.common.KryoTupleSerializerTest;

/* loaded from: input_file:tests/eu/qualimaster/common/switching/TestIntermediarySpout.class */
public class TestIntermediarySpout extends BaseSwitchSpout {
    protected static final int PORT = 8999;
    private static final Logger LOGGER = Logger.getLogger(TestIntermediarySpout.class);
    private SpoutOutputCollector collector;
    private String streamId;
    private AbstractSwitchMechanism mechanism;
    private AbstractSwitchStrategy strategy;
    private TupleReceiverServer server;
    private int count;
    private boolean isClosed;

    public TestIntermediarySpout(String str, String str2, String str3) {
        super(str, str2, true);
        this.count = 0;
        this.isClosed = false;
        this.streamId = str3;
    }

    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.collector = spoutOutputCollector;
        this.count = 0;
        this.strategy = new SeparateIntermediaryStrategy(map, IState.SwitchState.ACTIVE_DEFAULT);
        this.mechanism = new ParallelTrackSwitchMechanism(this.strategy);
        setSwitchMechanism(this.mechanism);
        this.server = new TupleReceiverServer(this.strategy.getTupleReceiverHandler(), PORT);
        this.server.start();
    }

    public void nextTuple() {
        ISwitchTuple nextTuple;
        if (this.isClosed || (nextTuple = this.mechanism.getNextTuple()) == null) {
            return;
        }
        KryoTupleSerializerTest.IDataItem iDataItem = (KryoTupleSerializerTest.IDataItem) nextTuple.getValue(0);
        LOGGER.info("id: " + iDataItem.getId() + ", value: " + iDataItem.getValue());
        this.count++;
        if (nextTuple.isGeneralTuple()) {
            this.collector.emit(this.streamId, nextTuple.getValues());
        } else {
            this.collector.emit(this.streamId, nextTuple.getValues(), Long.valueOf(nextTuple.getId()));
        }
        if (this.count == 20) {
            this.isClosed = true;
        }
    }

    public void ack(Object obj) {
        LOGGER.info("Acking the processed tuple: " + obj);
        super.ack(obj);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(this.streamId, new Fields(new String[]{"tuple"}));
    }

    public void close() {
        super.close();
        this.isClosed = true;
        try {
            this.server.stop();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
