package eu.qualimaster.common.switching.actions;

import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.common.signal.AbstractSignalConnection;
import eu.qualimaster.common.switching.QueueHolder;
import eu.qualimaster.common.switching.SwitchNodeNameInfo;
import java.util.Queue;
import org.apache.log4j.Logger;
import switching.logging.LogProtocol;
import switching.logging.SignalName;

/* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/switching/actions/SynchronizationStrategy.class */
public class SynchronizationStrategy implements ISynchronizationStrategy {
    private static final Logger LOGGER = Logger.getLogger(SynchronizationStrategy.class);
    private Queue<ISwitchTuple> inQueue;
    private Queue<ISwitchTuple> outQueue;
    private AbstractSignalConnection signalCon;
    private int overloadSize;
    private long lastProcessedId;
    private long lastEmittedId;
    private LogProtocol logProtocol;

    public SynchronizationStrategy(QueueHolder queueHolder, AbstractSignalConnection abstractSignalConnection, int i, LogProtocol logProtocol) {
        this(queueHolder, abstractSignalConnection, i);
        this.logProtocol = logProtocol;
    }

    public SynchronizationStrategy(QueueHolder queueHolder, AbstractSignalConnection abstractSignalConnection, int i) {
        this.inQueue = queueHolder.getInQueue();
        this.outQueue = queueHolder.getOutQueue();
        this.signalCon = abstractSignalConnection;
        this.overloadSize = i;
    }

    @Override // eu.qualimaster.common.switching.actions.ISynchronizationStrategy
    public void synchronizeData() {
        this.lastEmittedId = SwitchStates.getLastEmittedId();
        this.lastProcessedId = SwitchStates.getLastProcessedId();
        LOGGER.info("The lastEmittedId: " + this.lastEmittedId + ", the lastProcessedId: " + this.lastProcessedId);
        if (this.lastEmittedId == 0 || this.lastProcessedId == 0) {
            return;
        }
        if (this.lastProcessedId != this.lastEmittedId && this.outQueue.size() <= this.overloadSize && this.lastEmittedId - this.lastProcessedId <= this.overloadSize) {
            SwitchStates.setTransferringTrgINT(true);
            synchronizeItemsTrgINT();
            return;
        }
        this.outQueue.clear();
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Enable v2, v4 and v8!");
        }
        LOGGER.info(System.currentTimeMillis() + ", Enable v2, v4 and v8!");
        new SendSignalAction(Signal.ENABLE, getNameInfo().getTargetEndNodeName(), true, this.signalCon).execute();
        new SendSignalAction(Signal.ENABLE, getNameInfo().getPrecedingNodeName(), true, this.signalCon).execute();
        new EnableFlagAction(StreamFlowFlag.TGTINT_v4).execute();
        new CompleteSwitchAction(this.signalCon).execute();
    }

    private void synchronizeItemsTrgINT() {
        long j = 0;
        int i = 0;
        if (this.inQueue.isEmpty()) {
            LOGGER.info(System.currentTimeMillis() + ", Request to send all tuples.");
            i = (int) (this.lastEmittedId - this.lastProcessedId);
            j = this.lastEmittedId;
            SwitchStates.setTransferAll(true);
            if (null != this.logProtocol) {
                this.logProtocol.createSignalSENDLog(SignalName.TRANSFER, Integer.valueOf(i), getNameInfo().getOriginalIntermediaryNodeName());
            }
            new SendSignalAction(Signal.TRANSFER, getNameInfo().getOriginalIntermediaryNodeName(), Integer.valueOf(i), this.signalCon).execute();
        } else {
            long j2 = this.lastProcessedId;
            if (!this.inQueue.isEmpty()) {
                j2 = this.inQueue.peek().getId();
            }
            if (null != this.logProtocol) {
                this.logProtocol.createGENLog("Synchronizing the last id of the current alg: " + j2 + " with the last processed id of the previous alg:" + this.lastProcessedId);
            }
            if (j2 > this.lastProcessedId) {
                i = ((int) (j2 - this.lastProcessedId)) - 1;
                j = j2 - 1;
                String str = String.valueOf(j2) + "," + String.valueOf(this.lastProcessedId);
                if (null != this.logProtocol) {
                    this.logProtocol.createSignalSENDLog(SignalName.HEADID, str, getNameInfo().getOriginalIntermediaryNodeName());
                }
                new SendSignalAction(Signal.HEADID, getNameInfo().getOriginalIntermediaryNodeName(), str, this.signalCon).execute();
            } else {
                while (j2 < this.lastProcessedId && !this.inQueue.isEmpty()) {
                    j2 = this.inQueue.poll().getId();
                }
                if (null != this.logProtocol) {
                    this.logProtocol.createGENLog("Skipped tuples until the id:" + j2 + ", with input queue size:" + this.inQueue.size());
                    this.logProtocol.createGENLog("Completing the synchronization.");
                }
                new CompleteSwitchAction(this.signalCon).execute();
            }
        }
        if (null != this.logProtocol) {
            this.logProtocol.createSignalSENDLog(SignalName.ENABLE, Boolean.TRUE, getNameInfo().getTargetEndNodeName());
        }
        new SendSignalAction(Signal.ENABLE, getNameInfo().getTargetEndNodeName(), true, this.signalCon).execute();
        SwitchStates.setNumTransferredData(i);
        SwitchStates.setFirstTupleId(j);
    }

    private static SwitchNodeNameInfo getNameInfo() {
        return SwitchNodeNameInfo.getInstance();
    }
}
