package eu.qualimaster.common.switching.tupleReceiving;

import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.base.serializer.ISwitchTupleSerializer;
import eu.qualimaster.base.serializer.KryoSwitchTupleSerializer;
import eu.qualimaster.common.signal.AbstractSignalConnection;
import eu.qualimaster.common.switching.SynchronizedQueue;
import eu.qualimaster.common.switching.actions.ActionState;
import eu.qualimaster.common.switching.actions.CompleteSwitchAction;
import eu.qualimaster.common.switching.actions.SwitchActionMap;
import eu.qualimaster.common.switching.actions.SwitchStates;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import org.apache.log4j.Logger;
import switching.logging.LogProtocol;

/* loaded from: input_file:eu/qualimaster/common/switching/tupleReceiving/SeparatedTupleReceiverHandler.class */
public class SeparatedTupleReceiverHandler implements ITupleReceiverHandler {
    private static final Logger LOGGER = Logger.getLogger(SeparatedTupleReceiverHandler.class);
    private static boolean synOnce = true;
    private static int tmpCount = 0;
    private SynchronizedQueue<ISwitchTuple> synInQueue;
    private SynchronizedQueue<ISwitchTuple> synTmpQueue;
    private KryoSwitchTupleSerializer serializer;
    private AbstractSignalConnection signalCon;
    private SwitchActionMap switchActionMap;
    private Socket socket;
    private InputStream in;
    private Input kryoInput;
    private boolean cont;
    private LogProtocol logProtocol;

    public SeparatedTupleReceiverHandler(SynchronizedQueue<ISwitchTuple> synchronizedQueue, SynchronizedQueue<ISwitchTuple> synchronizedQueue2, ISwitchTupleSerializer iSwitchTupleSerializer, AbstractSignalConnection abstractSignalConnection, SwitchActionMap switchActionMap, LogProtocol logProtocol) throws IOException {
        this(synchronizedQueue, synchronizedQueue2, iSwitchTupleSerializer, abstractSignalConnection, switchActionMap);
        this.logProtocol = logProtocol;
    }

    public SeparatedTupleReceiverHandler(SynchronizedQueue<ISwitchTuple> synchronizedQueue, SynchronizedQueue<ISwitchTuple> synchronizedQueue2, ISwitchTupleSerializer iSwitchTupleSerializer, AbstractSignalConnection abstractSignalConnection, SwitchActionMap switchActionMap) throws IOException {
        this.kryoInput = null;
        this.cont = true;
        this.synInQueue = synchronizedQueue;
        this.synTmpQueue = synchronizedQueue2;
        this.serializer = (KryoSwitchTupleSerializer) iSwitchTupleSerializer;
        this.signalCon = abstractSignalConnection;
        this.switchActionMap = switchActionMap;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.cont && this.kryoInput != null && this.serializer != null) {
            try {
                byte[] bArr = new byte[this.kryoInput.readInt()];
                this.kryoInput.readBytes(bArr);
                ISwitchTuple deserialize = this.serializer.deserialize(bArr);
                if (deserialize != null) {
                    if (deserialize.getId() > SwitchStates.getFirstTupleId() || deserialize.getId() == 0) {
                        this.synInQueue.produce(deserialize);
                        if (null != this.logProtocol) {
                            this.logProtocol.createGENLog("inQueue-Received data with id: " + deserialize.getId() + ", firstId:" + SwitchStates.getFirstTupleId());
                        }
                    } else {
                        this.synTmpQueue.produce(deserialize);
                        tmpCount++;
                        if (null != this.logProtocol) {
                            this.logProtocol.createGENLog("tmpQueue-Received the transferred data with id: " + deserialize.getId() + ", firstId:" + SwitchStates.getFirstTupleId() + ", the expected count:" + SwitchStates.getNumTransferredData() + ", the actual count:" + tmpCount);
                        }
                        if (synOnce) {
                            synOnce = false;
                            this.logProtocol.createSynENDLog();
                            this.logProtocol.createGENLog("FIRST_TRANSFERRED_DATA_ARRIVED: The first transferred data is arrived!");
                            this.switchActionMap.executeActions(ActionState.FIRST_TRANSFERRED_DATA_ARRIVED, null, true, this.logProtocol);
                        }
                        if (tmpCount == SwitchStates.getNumTransferredData() || deserialize.getId() == SwitchStates.getFirstTupleId()) {
                            if (null != this.logProtocol) {
                                this.logProtocol.createGENLog("reached the last transferred data, firstId:" + SwitchStates.getFirstTupleId());
                                this.logProtocol.createGENLog("ALL_SYN_END: All the transferred data is arrived!");
                            }
                            new CompleteSwitchAction(this.signalCon).execute();
                        }
                    }
                }
            } catch (KryoException e) {
                e.printStackTrace();
            } catch (NegativeArraySizeException e2) {
                e2.printStackTrace();
            }
        }
    }

    @Override // eu.qualimaster.common.switching.tupleReceiving.ITupleReceiverHandler
    public void setSocket(Socket socket) {
        this.socket = socket;
        try {
            this.in = socket.getInputStream();
            this.kryoInput = new Input(this.in);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override // eu.qualimaster.common.switching.tupleReceiving.ITupleReceiverHandler
    public void stop() throws IOException {
        if (null != this.socket) {
            LOGGER.info("Stopping handler");
            this.cont = false;
            this.in.close();
            this.in = null;
            if (null != this.kryoInput) {
                this.kryoInput.close();
                this.kryoInput = null;
            }
            this.socket.close();
            this.socket = null;
            LOGGER.info("Stopped handler");
        }
    }

    @Override // eu.qualimaster.common.switching.tupleReceiving.ITupleReceiverHandler
    public boolean isStopped() {
        return null == this.socket;
    }
}
