package test.de.iip_ecosphere.platform.services.environment.services;

import de.iip_ecosphere.platform.services.environment.services.TransportConverter;
import de.iip_ecosphere.platform.services.environment.services.TransportConverterFactory;
import de.iip_ecosphere.platform.services.environment.services.TransportToWsConverter;
import de.iip_ecosphere.platform.services.environment.services.WsTransportConverterFactory;
import de.iip_ecosphere.platform.support.Endpoint;
import de.iip_ecosphere.platform.support.NetUtils;
import de.iip_ecosphere.platform.support.Schema;
import de.iip_ecosphere.platform.support.Server;
import de.iip_ecosphere.platform.support.ServerAddress;
import de.iip_ecosphere.platform.support.TimeUtils;
import de.iip_ecosphere.platform.support.iip_aas.AasPartRegistry;
import de.iip_ecosphere.platform.transport.Transport;
import de.iip_ecosphere.platform.transport.connectors.TransportSetup;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslator;
import de.iip_ecosphere.platform.transport.status.TraceRecord;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
import test.de.iip_ecosphere.platform.services.environment.services.TraceToAasServiceMain;
import test.de.iip_ecosphere.platform.test.amqp.qpid.TestQpidServer;

/* loaded from: input_file:test/de/iip_ecosphere/platform/services/environment/services/TransportToWsConverterTest.class */
public class TransportToWsConverterTest {
    @Test
    public void testConverter() {
        ServerAddress serverAddress = new ServerAddress(Schema.IGNORE);
        TestQpidServer testQpidServer = new TestQpidServer(serverAddress);
        testQpidServer.start();
        AtomicInteger atomicInteger = new AtomicInteger();
        TransportSetup transportSetup = new TransportSetup();
        transportSetup.setHost("localhost");
        transportSetup.setPort(serverAddress.getPort());
        transportSetup.setAuthenticationKey("amqp");
        Transport.setTransportSetup(() -> {
            return transportSetup;
        });
        Endpoint endpoint = new Endpoint(Schema.WS, "localhost", NetUtils.getEphemeralPort(), "/status");
        Server start = TransportToWsConverter.createServer(endpoint).start();
        TransportToWsConverter transportToWsConverter = new TransportToWsConverter("Trace", TraceRecord.class, endpoint);
        transportToWsConverter.start((AasPartRegistry.AasSetup) null);
        Assert.assertEquals(endpoint, transportToWsConverter.getEndpoint());
        TransportConverter.Watcher createWatcher = transportToWsConverter.createWatcher(0);
        createWatcher.setConsumer(traceRecord -> {
            atomicInteger.incrementAndGet();
        });
        createWatcher.start();
        TraceToAasServiceMain.MyData myData = new TraceToAasServiceMain.MyData(new int[]{128, 128, 64, 12, 0, 8});
        Transport.sendTraceRecord(new TraceRecord("source", "sending", myData));
        TimeUtils.sleep(700);
        Transport.sendTraceRecord(new TraceRecord("rtsa", "receiving", myData));
        TimeUtils.sleep(700);
        Transport.sendTraceRecord(new TraceRecord("rtsa", "sending", myData));
        TimeUtils.sleep(1500);
        Transport.sendTraceRecord(new TraceRecord("receiver", "receiving", myData));
        TimeUtils.sleep(1500);
        createWatcher.stop();
        transportToWsConverter.stop();
        start.stop(true);
        Assert.assertTrue(atomicInteger.get() > 3);
        testQpidServer.stop(true);
        Transport.releaseConnector(false);
    }

    @Test
    public void testFactory() {
        ServerAddress serverAddress = new ServerAddress(Schema.IGNORE);
        TestQpidServer testQpidServer = new TestQpidServer(serverAddress);
        testQpidServer.start();
        Assert.assertNotNull(TransportConverterFactory.getInstance());
        AtomicInteger atomicInteger = new AtomicInteger();
        TransportSetup transportSetup = new TransportSetup();
        transportSetup.setHost("localhost");
        transportSetup.setPort(serverAddress.getPort());
        transportSetup.setAuthenticationKey("amqp");
        transportSetup.setGatewayPort(NetUtils.getEphemeralPort());
        Transport.setTransportSetup(() -> {
            return transportSetup;
        });
        WsTransportConverterFactory wsTransportConverterFactory = WsTransportConverterFactory.INSTANCE;
        Server start = wsTransportConverterFactory.createServer((AasPartRegistry.AasSetup) null, transportSetup).start();
        TransportConverter createConverter = wsTransportConverterFactory.createConverter((AasPartRegistry.AasSetup) null, transportSetup, "Trace", "/status", (TypeTranslator) null, TraceRecord.class);
        createConverter.start((AasPartRegistry.AasSetup) null);
        Assert.assertNotNull(createConverter.getEndpoint());
        TransportConverter.Watcher createWatcher = wsTransportConverterFactory.createWatcher((AasPartRegistry.AasSetup) null, transportSetup, "/status", (TypeTranslator) null, TraceRecord.class, 0);
        createWatcher.setConsumer(traceRecord -> {
            atomicInteger.incrementAndGet();
        });
        createWatcher.start();
        TraceToAasServiceMain.MyData myData = new TraceToAasServiceMain.MyData(new int[]{128, 128, 64, 12, 0, 8});
        Transport.sendTraceRecord(new TraceRecord("source", "sending", myData));
        TimeUtils.sleep(700);
        Transport.sendTraceRecord(new TraceRecord("rtsa", "receiving", myData));
        TimeUtils.sleep(700);
        Transport.sendTraceRecord(new TraceRecord("rtsa", "sending", myData));
        TimeUtils.sleep(1500);
        Transport.sendTraceRecord(new TraceRecord("receiver", "receiving", myData));
        TimeUtils.sleep(700);
        createWatcher.stop();
        createConverter.stop();
        start.stop(true);
        Assert.assertEquals(4L, atomicInteger.get());
        testQpidServer.stop(true);
        Transport.releaseConnector(false);
    }
}
