package eu.qualimaster.common.switching.actions;

import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.base.pipeline.NodeHostStorm;
import eu.qualimaster.base.serializer.KryoSwitchTupleSerializer;
import eu.qualimaster.common.signal.AbstractSignalConnection;
import eu.qualimaster.common.switching.QueueHolder;
import eu.qualimaster.common.switching.SwitchNodeNameInfo;
import eu.qualimaster.common.switching.TupleSender;
import java.util.List;
import java.util.Queue;
import switching.logging.LogProtocol;
import switching.logging.QueueStatus;
import switching.logging.SignalName;

/* loaded from: input_file:eu/qualimaster/common/switching/actions/TransferDataStrategy.class */
public class TransferDataStrategy implements ITransferDataStrategy {
    private Queue<ISwitchTuple> inQueue;
    private Queue<ISwitchTuple> outQueue;
    private AbstractSignalConnection signalCon;
    private KryoSwitchTupleSerializer serializer;
    private String host;
    private TupleSender sender;
    private List<TupleSender> senders;
    private long lastProcessedId;
    private long headId;
    private LogProtocol logProtocol;
    private static boolean sendOnce = false;

    public TransferDataStrategy(QueueHolder queueHolder, AbstractSignalConnection abstractSignalConnection, KryoSwitchTupleSerializer kryoSwitchTupleSerializer, LogProtocol logProtocol) {
        this(queueHolder, abstractSignalConnection, kryoSwitchTupleSerializer);
        this.logProtocol = logProtocol;
    }

    public TransferDataStrategy(QueueHolder queueHolder, AbstractSignalConnection abstractSignalConnection, KryoSwitchTupleSerializer kryoSwitchTupleSerializer) {
        this.senders = null;
        this.inQueue = queueHolder.getInQueue();
        this.outQueue = queueHolder.getOutQueue();
        this.signalCon = abstractSignalConnection;
        this.serializer = kryoSwitchTupleSerializer;
    }

    @Override // eu.qualimaster.common.switching.actions.ITransferDataStrategy
    public void transferData() {
        this.lastProcessedId = SwitchStates.getLastProcessedId();
        this.headId = SwitchStates.getHeadId();
        connectMultiTargetNodes();
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Transferring data to the host: " + this.host + ", the headId: " + this.headId + ", the lastProcessedId: " + this.lastProcessedId);
        }
        if (this.lastProcessedId == 0) {
            this.logProtocol.createGENLog("Error: the lastProcessedId is ZERO!!!");
            return;
        }
        this.logProtocol.createGENLog("isTransferAll:" + SwitchStates.isTransferAll());
        if (this.headId == 0 || SwitchStates.isTransferAll()) {
            transferAllOrgINT();
        } else if (this.headId != 0) {
            transferMissingItemsOrgINT();
        }
        goToPassive();
    }

    private void connectTargetNode() {
        this.host = getHost(getNameInfo().getTargetIntermediaryNodeName());
        this.sender = new TupleSender(this.host, SwitchStates.getTargetPort());
    }

    private void connectMultiTargetNodes() {
        this.senders = NodeHostStorm.createTupleSenders(getNameInfo().getTopologyName(), getNameInfo().getTargetIntermediaryNodeName(), SwitchStates.getTargetPort());
    }

    public void transferMissingItemsOrgINT() {
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Transferring missing items with outQueue: " + this.outQueue.size() + ", inQueue:" + this.inQueue.size() + ", lastProcessedId: " + this.lastProcessedId + ", headId: " + this.headId);
        }
        while (!this.outQueue.isEmpty()) {
            ISwitchTuple poll = this.outQueue.poll();
            long id = poll.getId();
            if (id > this.lastProcessedId && id < this.headId) {
                if (null != this.logProtocol) {
                    this.logProtocol.createTRANSFERLog(QueueStatus.OUTPUT, id);
                }
                sendToTargetShuffle(poll);
            }
            if (id == this.headId) {
                break;
            }
        }
        if (!this.inQueue.isEmpty()) {
            long id2 = this.inQueue.peek().getId();
            while (true) {
                long j = id2;
                if (j >= this.headId) {
                    break;
                }
                ISwitchTuple poll2 = this.inQueue.poll();
                if (j > this.lastProcessedId) {
                    if (null != this.logProtocol) {
                        this.logProtocol.createTRANSFERLog(QueueStatus.INPUT, j);
                    }
                    sendToTargetShuffle(poll2);
                }
                id2 = poll2.getId();
            }
        }
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Reached the end of transferring missing items.");
            this.logProtocol.createQUEUELog(QueueStatus.INPUT, this.inQueue.size());
            this.logProtocol.createQUEUELog(QueueStatus.OUTPUT, this.outQueue.size());
        }
    }

    public void transferAllOrgINT() {
        long j = 0;
        int i = 0;
        if (!this.outQueue.isEmpty()) {
            j = this.outQueue.peek().getId();
        } else if (!this.inQueue.isEmpty()) {
            j = this.inQueue.peek().getId();
        }
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Transfer all items to the target Spout with the top id:" + j);
            this.logProtocol.createQUEUELog(QueueStatus.INPUT, this.inQueue.size());
            this.logProtocol.createQUEUELog(QueueStatus.OUTPUT, this.outQueue.size());
        }
        while (!this.outQueue.isEmpty()) {
            ISwitchTuple poll = this.outQueue.poll();
            if (poll.getId() > this.lastProcessedId) {
                sendToTargetShuffle(poll);
                i++;
                if (null != this.logProtocol) {
                    this.logProtocol.createTRANSFERLog(QueueStatus.OUTPUT, poll.getId());
                    this.logProtocol.createGENLog("The count of the transferred data: " + i);
                }
            }
        }
        while (!this.inQueue.isEmpty()) {
            ISwitchTuple poll2 = this.inQueue.poll();
            if (poll2.getId() > this.lastProcessedId) {
                sendToTargetShuffle(poll2);
                i++;
                if (null != this.logProtocol) {
                    this.logProtocol.createTRANSFERLog(QueueStatus.INPUT, poll2.getId());
                    this.logProtocol.createGENLog("The count of the transferred data: " + i);
                }
            }
        }
        if (!sendOnce && this.inQueue.isEmpty() && this.outQueue.isEmpty()) {
            sendOnce = true;
            if (null != this.logProtocol) {
                this.logProtocol.createSignalSENDLog(SignalName.TRANSFERRED, Integer.valueOf(i), getNameInfo().getTargetIntermediaryNodeName());
            }
            new SendSignalAction(Signal.TRANSFERRED, getNameInfo().getTargetIntermediaryNodeName(), Integer.valueOf(i), this.signalCon).execute();
        }
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("The end of transferring all items to the target Spout.");
            this.logProtocol.createQUEUELog(QueueStatus.INPUT, this.inQueue.size());
            this.logProtocol.createQUEUELog(QueueStatus.OUTPUT, this.outQueue.size());
        }
    }

    private void sendToTarget(ISwitchTuple iSwitchTuple) {
        this.sender.send(this.serializer.serialize(iSwitchTuple));
    }

    private void sendToTargetShuffle(ISwitchTuple iSwitchTuple) {
        NodeHostStorm.shuffleSender(this.senders).send(this.serializer.serialize(iSwitchTuple));
    }

    public void goToPassive() {
        this.outQueue.clear();
        SwitchStates.setPassivateOrgINT(true);
        SwitchStates.setTransferringOrgINT(false);
        if (null != this.logProtocol) {
            this.logProtocol.createGENLog("Go to passive and inform the end bolt.");
        }
        new SendSignalAction(Signal.GOTOPASSIVE, getNameInfo().getOriginalEndNodeName(), true, this.signalCon).execute();
    }

    private static String getHost(String str) {
        return NodeHostStorm.getHost(getNameInfo().getTopologyName(), str);
    }

    private static SwitchNodeNameInfo getNameInfo() {
        return SwitchNodeNameInfo.getInstance();
    }
}
