package tests.eu.qualimaster.common;

import backtype.storm.Config;
import eu.qualimaster.base.algorithm.GeneralTuple;
import eu.qualimaster.base.algorithm.IGeneralTuple;
import eu.qualimaster.base.algorithm.ISwitchTuple;
import eu.qualimaster.base.algorithm.SwitchTuple;
import eu.qualimaster.base.serializer.IGeneralTupleSerializer;
import eu.qualimaster.base.serializer.ISwitchTupleSerializer;
import eu.qualimaster.common.switching.ITupleReceiveCreator;
import eu.qualimaster.common.switching.KryoGeneralTupleSerializerCreator;
import eu.qualimaster.common.switching.KryoSwitchTupleSerializerCreator;
import eu.qualimaster.common.switching.SynchronizedQueue;
import eu.qualimaster.common.switching.TupleReceiverHandlerCreator;
import eu.qualimaster.common.switching.TupleReceiverServer;
import eu.qualimaster.common.switching.TupleSender;
import eu.qualimaster.common.switching.tupleReceiving.ITupleReceiverHandler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Assert;
import org.junit.Test;
import tests.eu.qualimaster.common.KryoTupleSerializerTest;

/* loaded from: input_file:tests/eu/qualimaster/common/TupleSenderAndReceiverTest.class */
public class TupleSenderAndReceiverTest {
    private transient SynchronizedQueue<IGeneralTuple> syn = null;
    private transient SynchronizedQueue<IGeneralTuple> tmpSyn = null;
    private transient Queue<IGeneralTuple> queue = null;
    private transient Queue<IGeneralTuple> tmpQueue = null;

    /* loaded from: input_file:tests/eu/qualimaster/common/TupleSenderAndReceiverTest$ConsumeTuple.class */
    public class ConsumeTuple implements Runnable {
        public ConsumeTuple() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                if (TupleSenderAndReceiverTest.this.syn.currentSize() > 0) {
                    TupleSenderAndReceiverTest.this.assertReceivedData((IGeneralTuple) TupleSenderAndReceiverTest.this.syn.consume());
                }
                if (TupleSenderAndReceiverTest.this.tmpSyn.currentSize() > 0) {
                    TupleSenderAndReceiverTest.this.assertReceivedData((IGeneralTuple) TupleSenderAndReceiverTest.this.tmpSyn.consume());
                }
            }
        }
    }

    @Test
    public void test() {
        Map createStormKryoConf = StormTestUtils.createStormKryoConf();
        Config.registerSerialization(createStormKryoConf, KryoTupleSerializerTest.DataItem.class, KryoTupleSerializerTest.DataItemSerializer.class);
        KryoGeneralTupleSerializerCreator kryoGeneralTupleSerializerCreator = new KryoGeneralTupleSerializerCreator(createStormKryoConf);
        KryoSwitchTupleSerializerCreator kryoSwitchTupleSerializerCreator = new KryoSwitchTupleSerializerCreator(createStormKryoConf);
        this.queue = new ConcurrentLinkedQueue();
        this.tmpQueue = new ConcurrentLinkedQueue();
        this.syn = new SynchronizedQueue<>(this.queue, 20);
        this.tmpSyn = new SynchronizedQueue<>(this.tmpQueue, 5);
        TupleReceiverHandlerCreator tupleReceiverHandlerCreator = new TupleReceiverHandlerCreator(kryoGeneralTupleSerializerCreator, kryoSwitchTupleSerializerCreator, this.syn, this.tmpSyn);
        TupleReceiverServer tupleReceiverServer = new TupleReceiverServer(tupleReceiverHandlerCreator, 8999);
        tupleReceiverServer.start();
        System.out.println("Server is started...");
        IGeneralTupleSerializer createGeneralTupleSerializer = kryoGeneralTupleSerializerCreator.createGeneralTupleSerializer();
        ISwitchTupleSerializer createSwitchTupleSerializer = kryoSwitchTupleSerializerCreator.createSwitchTupleSerializer();
        TupleSender tupleSender = new TupleSender("localhost", 8999);
        new Thread(new ConsumeTuple()).start();
        sendGeneralTuple(tupleSender, createGeneralTupleSerializer, 10);
        tupleSender.sendSwitchTupleFlag();
        sendSwitchTuple(tupleSender, createSwitchTupleSerializer, 10);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        tupleSender.sendSwitchTupleFlag();
        tupleSender.sendTemporaryQueueFlag();
        sendSwitchTuple(tupleSender, createSwitchTupleSerializer, 10);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        tupleSender.stop();
        try {
            tupleReceiverServer.stop();
        } catch (IOException e3) {
            e3.printStackTrace();
        }
        testSwitchTupleReceiver(tupleReceiverHandlerCreator, createSwitchTupleSerializer);
    }

    public void testSwitchTupleReceiver(ITupleReceiveCreator iTupleReceiveCreator, ISwitchTupleSerializer iSwitchTupleSerializer) {
        ITupleReceiverHandler create = iTupleReceiveCreator.create(true);
        new TupleReceiverServer(iTupleReceiveCreator, 8998, true).start();
        System.out.println("Switch Server is started...");
        TupleSender tupleSender = new TupleSender("localhost", 8998);
        sendSwitchTuple(tupleSender, iSwitchTupleSerializer, 10);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        tupleSender.stop();
        try {
            create.stop();
        } catch (IOException e2) {
            e2.printStackTrace();
        }
    }

    public static void sendGeneralTuple(TupleSender tupleSender, IGeneralTupleSerializer iGeneralTupleSerializer, int i) {
        while (i > 0) {
            tupleSender.send(iGeneralTupleSerializer.serialize(createGeneralTuple()));
            i--;
        }
    }

    public static void sendSwitchTuple(TupleSender tupleSender, ISwitchTupleSerializer iSwitchTupleSerializer, int i) {
        while (i > 0) {
            tupleSender.send(iSwitchTupleSerializer.serialize(createSwitchTuple(i)));
            i--;
        }
    }

    private static IGeneralTuple createGeneralTuple() {
        return new GeneralTuple(createTupleValues());
    }

    private static ISwitchTuple createSwitchTuple(int i) {
        return new SwitchTuple(i, createTupleValues());
    }

    private static List<Object> createTupleValues() {
        KryoTupleSerializerTest.DataItem dataItem = new KryoTupleSerializerTest.DataItem(1, "data");
        ArrayList arrayList = new ArrayList();
        arrayList.add(dataItem);
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void assertReceivedData(IGeneralTuple iGeneralTuple) {
        KryoTupleSerializerTest.IDataItem iDataItem = (KryoTupleSerializerTest.IDataItem) iGeneralTuple.getValue(0);
        System.out.println("Received Data - IS GENERAL: " + iGeneralTuple.isGeneralTuple() + ", id: " + iDataItem.getId() + ", value: " + iDataItem.getValue());
        Assert.assertEquals(1L, iDataItem.getId());
        Assert.assertEquals("data", iDataItem.getValue());
    }
}
