package eu.qualimaster.common.switching.synchronization;

import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.base.pipeline.NodeHostStorm;
import eu.qualimaster.common.signal.AbstractSignalConnection;
import eu.qualimaster.common.signal.GoToPassiveSignal;
import eu.qualimaster.common.signal.SignalStates;
import eu.qualimaster.common.signal.TransferredSignal;
import eu.qualimaster.common.switching.QueueHolder;
import eu.qualimaster.common.switching.TupleSender;
import java.util.Queue;
import org.apache.log4j.Logger;

/* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/switching/synchronization/SeparatedOrgINTSynchronizationStrategy.class */
public class SeparatedOrgINTSynchronizationStrategy extends AbstractSynchronizationStrategy {
    public static final String STRATEGYTYPE = "synchronizationORGINT";
    private static final Logger LOGGER = Logger.getLogger(SeparatedOrgINTSynchronizationStrategy.class);
    private long lastProcessedId;
    private long headId;
    private Queue<ISwitchTuple> inQueue;
    private Queue<ISwitchTuple> outQueue;
    private String host;
    private TupleSender sender;

    public SeparatedOrgINTSynchronizationStrategy(QueueHolder queueHolder, AbstractSignalConnection abstractSignalConnection, int i) {
        super(queueHolder, abstractSignalConnection, i);
        this.inQueue = getInQueue();
        this.outQueue = getOutQueue();
    }

    @Override // eu.qualimaster.common.switching.synchronization.AbstractSynchronizationStrategy, eu.qualimaster.common.switching.IStrategy
    public String getStrategyType() {
        return STRATEGYTYPE;
    }

    @Override // eu.qualimaster.common.switching.synchronization.ISynchronizationStrategy
    public void doSynchronization() {
    }

    @Override // eu.qualimaster.common.switching.synchronization.ISynchronizationStrategy
    public void doDataTransfer() {
        this.lastProcessedId = SignalStates.getLastProcessedId();
        this.headId = SignalStates.getHeadId();
        this.host = getHost(getNameInfo().getTargetIntermediaryNodeName());
        this.sender = new TupleSender(this.host, SignalStates.getTargetPort());
        LOGGER.info("Transferring data to the host: " + this.host + ", the headId: " + this.headId + ", the lastProcessedId: " + this.lastProcessedId);
        if (this.lastProcessedId != 0) {
            if (SignalStates.isTransferAll()) {
                transferAllOrgINT();
            } else if (this.headId != 0) {
                transferMissingItemsOrgINT();
            }
            goToPassive();
        }
    }

    public void transferMissingItemsOrgINT() {
        LOGGER.info("Transferring missing items with outQueue: " + this.outQueue.size() + ", inQueue:" + this.inQueue.size() + ", lastProcessedId: " + this.lastProcessedId + ", headId: " + this.headId);
        while (!this.outQueue.isEmpty()) {
            ISwitchTuple poll = this.outQueue.poll();
            long id = poll.getId();
            if (id > this.lastProcessedId && id < this.headId) {
                LOGGER.info(System.currentTimeMillis() + ", outQueue--Transferring the missing items " + id);
                sendToTarget(poll);
            }
            if (id == this.headId) {
                break;
            }
        }
        if (!this.inQueue.isEmpty()) {
            long id2 = this.inQueue.peek().getId();
            while (true) {
                long j = id2;
                if (j >= this.headId) {
                    break;
                }
                ISwitchTuple poll2 = this.inQueue.poll();
                if (j > this.lastProcessedId) {
                    LOGGER.info(System.currentTimeMillis() + ", inQueue--Transferring the missing items " + j);
                    sendToTarget(poll2);
                }
                id2 = poll2.getId();
            }
        }
        LOGGER.info("The end of transferring missing items with outQueue: " + this.outQueue.size() + ", inQueue:" + this.inQueue.size());
    }

    public void transferAllOrgINT() {
        long j = 0;
        long j2 = 0;
        int i = 0;
        if (!this.outQueue.isEmpty()) {
            j = this.outQueue.peek().getId();
        } else if (!this.inQueue.isEmpty()) {
            j = this.inQueue.peek().getId();
        }
        LOGGER.info("Transfer all items to the target Spout with outQueue size:" + this.outQueue.size() + ", inQueue size:" + this.inQueue.size() + " Top id:" + j);
        while (!this.outQueue.isEmpty()) {
            ISwitchTuple poll = this.outQueue.poll();
            long id = poll.getId();
            if (id > this.lastProcessedId) {
                LOGGER.info(System.currentTimeMillis() + " Transferring the out queue to the target Spout: " + poll.getId() + ", count: " + i);
                sendToTarget(poll);
                j2 = id;
                i++;
            }
        }
        while (!this.inQueue.isEmpty()) {
            ISwitchTuple poll2 = this.inQueue.poll();
            long id2 = poll2.getId();
            if (id2 > this.lastProcessedId) {
                LOGGER.info(System.currentTimeMillis() + " Transferring the in queue to the target Spout." + poll2.getId());
                sendToTarget(poll2);
                j2 = id2;
                i++;
            }
        }
        if (i < SignalStates.getNumTransferredData()) {
            TransferredSignal.sendSignal(getNameInfo().getTopologyName(), getNameInfo().getTargetIntermediaryNodeName(), Integer.valueOf(i), getSignalConnection());
            LOGGER.info(System.currentTimeMillis() + ", transferAll --Sent transferred signal with the number of data: " + i);
        }
        if (j2 == 0) {
            LOGGER.info(System.currentTimeMillis() + ", transferAll --Sending transferred signal with the last transferred Id: " + j2);
            TransferredSignal.sendSignal(getNameInfo().getTopologyName(), getNameInfo().getTargetIntermediaryNodeName(), Long.valueOf(j2), getSignalConnection());
        }
        LOGGER.info("The end of transferring all items to the target Spout. with outQueue size:" + this.outQueue.size() + ", inQueue size:" + this.inQueue.size());
    }

    private void sendToTarget(ISwitchTuple iSwitchTuple) {
        this.sender.send(SignalStates.getKryoSerOrgINT().serialize(iSwitchTuple));
    }

    public void goToPassive() {
        this.outQueue.clear();
        SignalStates.setPassivateOrgINT(true);
        SignalStates.setTransferringOrgINT(false);
        LOGGER.info(System.currentTimeMillis() + ", Go to passive and inform the end bolt.");
        GoToPassiveSignal.sendSignal(getNameInfo().getTopologyName(), getNameInfo().getOriginalEndNodeName(), true, getSignalConnection());
    }

    private static String getHost(String str) {
        LOGGER.info("Getting the host, " + getNameInfo().getTopologyName() + ", " + str);
        return NodeHostStorm.getHost(getNameInfo().getTopologyName(), str);
    }
}
