package eu.qualimaster.common.switching;

import eu.qualimaster.base.algorithm.IGeneralTuple;
import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.base.serializer.KryoGeneralTupleSerializer;
import eu.qualimaster.base.serializer.KryoSwitchTupleSerializer;
import eu.qualimaster.common.signal.TopologySignal;
import eu.qualimaster.common.switching.IState;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;

/* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/switching/SeparateIntermediaryStrategy.class */
public class SeparateIntermediaryStrategy extends AbstractSwitchStrategy {
    private static final Logger LOGGER = Logger.getLogger(SeparateIntermediaryStrategy.class);
    private static final int QUEUE_SIZE = 100;
    private Map conf;
    private transient Queue<IGeneralTuple> inQueue;
    private transient LinkedList<IGeneralTuple> outQueue;
    private transient Queue<IGeneralTuple> tmpQueue;
    private IState.SwitchState currentState;
    private long lastProcessedId;
    private transient SynchronizedQueue<IGeneralTuple> syn = null;
    private transient SynchronizedQueue<IGeneralTuple> tmpSyn = null;
    private Map<String, Serializable> parameters = new HashMap();

    public SeparateIntermediaryStrategy(Map map, IState.SwitchState switchState) {
        this.inQueue = null;
        this.outQueue = null;
        this.tmpQueue = null;
        this.conf = map;
        this.inQueue = new ConcurrentLinkedQueue();
        this.outQueue = new LinkedList<>();
        this.tmpQueue = new ConcurrentLinkedQueue();
        this.currentState = switchState;
    }

    @Override // eu.qualimaster.common.switching.ISwitchStrategy
    public Serializable getSignalValue(String str) {
        return null;
    }

    @Override // eu.qualimaster.common.switching.ISwitchStrategy
    public TupleReceiverHandler getTupleReceiverHandler() {
        this.syn = new SynchronizedQueue<>(this.inQueue, 100);
        this.tmpSyn = new SynchronizedQueue<>(this.tmpQueue, 100);
        return new TupleReceiverHandler(new KryoGeneralTupleSerializer(this.conf), new KryoSwitchTupleSerializer(this.conf), this.syn, this.tmpSyn);
    }

    @Override // eu.qualimaster.common.switching.ISwitchStrategy
    public IGeneralTuple produceTuple() {
        IGeneralTuple iGeneralTuple = null;
        if (this.currentState.equals(IState.SwitchState.ACTIVE_DEFAULT)) {
            iGeneralTuple = this.syn.consume();
            if (!iGeneralTuple.isGeneralTuple()) {
                this.outQueue.add(iGeneralTuple);
            }
        }
        return iGeneralTuple;
    }

    @Override // eu.qualimaster.common.switching.ISwitchStrategy
    public void doSignal(TopologySignal topologySignal) {
    }

    @Override // eu.qualimaster.common.switching.ISwitchStrategy
    public void ack(Object obj) {
        Iterator<IGeneralTuple> descendingIterator = this.outQueue.descendingIterator();
        while (descendingIterator.hasNext()) {
            ISwitchTuple iSwitchTuple = (ISwitchTuple) descendingIterator.next();
            if (obj.equals(Long.valueOf(iSwitchTuple.getId()))) {
                this.lastProcessedId = iSwitchTuple.getId();
                LOGGER.info(System.currentTimeMillis() + " Acked the tuple with the msgId: " + obj + " removed: " + this.outQueue.remove(iSwitchTuple) + ", outQueue size: " + this.outQueue.size());
                return;
            }
        }
    }
}
