package tests.eu.qualimaster.common.switching;

import backtype.storm.Config;
import eu.qualimaster.base.algorithm.GeneralTuple;
import eu.qualimaster.base.algorithm.IGeneralTuple;
import eu.qualimaster.base.serializer.KryoGeneralTupleSerializer;
import eu.qualimaster.common.switching.IGeneralTupleSerializerCreator;
import eu.qualimaster.common.switching.SynchronizedQueue;
import eu.qualimaster.common.switching.TupleReceiverHandlerCreator;
import eu.qualimaster.common.switching.TupleReceiverServer;
import eu.qualimaster.common.switching.TupleSender;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import tests.eu.qualimaster.common.KryoTupleSerializerTest;
import tests.eu.qualimaster.common.StormTestUtils;

/* loaded from: input_file:tests/eu/qualimaster/common/switching/ParallelNetworkTest.class */
public class ParallelNetworkTest implements IGeneralTupleSerializerCreator {
    private static final boolean REUSE_SERIALIZER = false;
    private GeneralTuple tuple;
    private Map conf;
    private KryoGeneralTupleSerializer serializer;
    private SynchronizedQueue<IGeneralTuple> syn;

    /* loaded from: input_file:tests/eu/qualimaster/common/switching/ParallelNetworkTest$ReceiverRunnable.class */
    private class ReceiverRunnable implements Runnable {
        private boolean run;
        private int received;

        private ReceiverRunnable() {
            this.run = true;
            this.received = ParallelNetworkTest.REUSE_SERIALIZER;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.run) {
                if (ParallelNetworkTest.this.syn.currentSize() > 0) {
                    ParallelNetworkTest.this.syn.consume();
                    this.received++;
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getReceivedCount() {
            return this.received;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.run = false;
        }
    }

    /* loaded from: input_file:tests/eu/qualimaster/common/switching/ParallelNetworkTest$SenderTestRunnable.class */
    private class SenderTestRunnable implements Runnable {
        private TupleSender sender;
        private boolean run;
        private int id;
        private int sent;
        private KryoGeneralTupleSerializer genSer;

        private SenderTestRunnable(int i) {
            this.run = true;
            this.sent = ParallelNetworkTest.REUSE_SERIALIZER;
            this.genSer = ParallelNetworkTest.this.m5createGeneralTupleSerializer();
            this.id = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.sender = new TupleSender("localhost", 8025);
            while (this.run) {
                this.sender.send(this.genSer.serialize(ParallelNetworkTest.this.tuple));
                this.sent++;
                ParallelNetworkTest.sleep(10L);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.run = false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getSentCount() {
            return this.sent;
        }
    }

    @Before
    public void setup() {
        this.tuple = new GeneralTuple();
        this.tuple.setValues(new ArrayList());
        this.conf = StormTestUtils.createStormKryoConf();
        Config.registerSerialization(this.conf, KryoTupleSerializerTest.DataItem.class, KryoTupleSerializerTest.DataItemSerializer.class);
        this.syn = new SynchronizedQueue<>(new ConcurrentLinkedQueue(), 20);
    }

    /* renamed from: createGeneralTupleSerializer, reason: merged with bridge method [inline-methods] */
    public KryoGeneralTupleSerializer m5createGeneralTupleSerializer() {
        return new KryoGeneralTupleSerializer(this.conf);
    }

    @Test
    public void testNetwork() throws IOException {
        TupleReceiverServer tupleReceiverServer = new TupleReceiverServer(new TupleReceiverHandlerCreator(this, this.syn), 8025);
        tupleReceiverServer.start();
        ReceiverRunnable receiverRunnable = new ReceiverRunnable();
        new Thread(receiverRunnable).start();
        SenderTestRunnable[] senderTestRunnableArr = new SenderTestRunnable[10];
        for (int i = REUSE_SERIALIZER; i < senderTestRunnableArr.length; i++) {
            senderTestRunnableArr[i] = new SenderTestRunnable(i);
            new Thread(senderTestRunnableArr[i]).start();
        }
        sleep(2000L);
        int i2 = REUSE_SERIALIZER;
        for (int i3 = REUSE_SERIALIZER; i3 < senderTestRunnableArr.length; i3++) {
            senderTestRunnableArr[i3].stop();
            i2 += senderTestRunnableArr[i3].getSentCount();
        }
        sleep(100L);
        receiverRunnable.stop();
        tupleReceiverServer.stop();
        String str = "sent " + i2 + " received " + receiverRunnable.getReceivedCount();
        System.out.println(str);
        Assert.assertTrue(str + " difference > 5", Math.abs(receiverRunnable.getReceivedCount() - i2) < 5);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void sleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }
}
