/*
 * Decompiled with CFR 0.152.
 */
package test.de.iip_ecosphere.platform.services.environment;

import de.iip_ecosphere.platform.services.environment.AbstractService;
import de.iip_ecosphere.platform.services.environment.DataIngestor;
import de.iip_ecosphere.platform.services.environment.EnvironmentSetup;
import de.iip_ecosphere.platform.services.environment.PythonAsyncProcessService;
import de.iip_ecosphere.platform.services.environment.PythonSyncProcessService;
import de.iip_ecosphere.platform.services.environment.PythonWsProcessService;
import de.iip_ecosphere.platform.services.environment.ServiceKind;
import de.iip_ecosphere.platform.services.environment.ServiceState;
import de.iip_ecosphere.platform.services.environment.YamlProcess;
import de.iip_ecosphere.platform.services.environment.YamlServer;
import de.iip_ecosphere.platform.services.environment.YamlService;
import de.iip_ecosphere.platform.support.NetUtils;
import de.iip_ecosphere.platform.support.Schema;
import de.iip_ecosphere.platform.support.Server;
import de.iip_ecosphere.platform.support.ServerAddress;
import de.iip_ecosphere.platform.support.TimeUtils;
import de.iip_ecosphere.platform.support.Version;
import de.iip_ecosphere.platform.support.iip_aas.ActiveAasBase;
import de.iip_ecosphere.platform.transport.Transport;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslator;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslators;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import test.de.iip_ecosphere.platform.services.environment.pythonEnv.Rec13;
import test.de.iip_ecosphere.platform.services.environment.pythonEnv.Rec13Impl;
import test.de.iip_ecosphere.platform.services.environment.pythonEnv.Rec13InTranslator;
import test.de.iip_ecosphere.platform.services.environment.pythonEnv.Rec13OutTranslator;
import test.de.iip_ecosphere.platform.transport.TestWithQpid;

public class PythonProcessServiceTest
extends TestWithQpid {
    private String stringParam = null;
    private Server qpid;

    private List<String> composeCmdLineArguments(String serviceId) {
        return this.composeCmdLineArguments(serviceId, "console", -1);
    }

    private List<String> composeCmdLineArguments(String serviceId, String mode, int port) {
        File f = new File("src/test/python");
        ArrayList<String> args = new ArrayList<String>();
        args.add("--mode");
        args.add(mode);
        if (port > 0) {
            args.add("--port");
            args.add(String.valueOf(port));
        }
        args.add("--modulesPath");
        args.add(f.getAbsolutePath());
        args.add("--sid");
        args.add(serviceId);
        return args;
    }

    @Test
    public void testAsyncProcessService() throws ExecutionException, IOException {
        YamlService sDesc = new YamlService();
        sDesc.setName("Test");
        sDesc.setVersion(new Version("0.0.1"));
        sDesc.setKind(ServiceKind.TRANSFORMATION_SERVICE);
        sDesc.setId("Test");
        sDesc.setDeployable(true);
        YamlProcess pDesc = new YamlProcess();
        pDesc.setHomePath("src/main/python");
        pDesc.setCmdArg(this.composeCmdLineArguments("1234"));
        sDesc.setProcess(pDesc);
        this.testService(new PythonAsyncProcessService(sDesc));
    }

    @Test
    public void testWsProcessService() throws ExecutionException, IOException {
        YamlService sDesc = new YamlService();
        sDesc.setName("Test");
        sDesc.setVersion(new Version("0.0.1"));
        sDesc.setKind(ServiceKind.TRANSFORMATION_SERVICE);
        sDesc.setId("Test");
        sDesc.setDeployable(true);
        YamlProcess pDesc = new YamlProcess();
        pDesc.setHomePath("src/main/python");
        pDesc.setCmdArg(this.composeCmdLineArguments("1234"));
        sDesc.setProcess(pDesc);
        this.testService((PythonAsyncProcessService)new PythonWsProcessService(sDesc));
    }

    private void testService(PythonAsyncProcessService service) throws ExecutionException, IOException {
        final AtomicInteger receivedStringCount = new AtomicInteger(0);
        final AtomicInteger receivedRec13Count = new AtomicInteger(0);
        String stringTypeName = "S";
        String rec13TypeName = "Rec13";
        service.enableFileDeletion(false);
        service.registerInputTypeTranslator(String.class, "S", TypeTranslators.STRING);
        service.registerOutputTypeTranslator(String.class, "S", TypeTranslators.STRING);
        service.attachIngestor(String.class, "S", (DataIngestor)new DataIngestor<String>(){

            public void ingest(String data) {
                receivedStringCount.incrementAndGet();
            }
        });
        service.registerInputTypeTranslator(Rec13.class, "Rec13", (TypeTranslator)new Rec13InTranslator());
        service.registerOutputTypeTranslator(Rec13.class, "Rec13", (TypeTranslator)new Rec13OutTranslator());
        service.attachIngestor(Rec13.class, "Rec13", (DataIngestor)new DataIngestor<Rec13>(){

            public void ingest(Rec13 data) {
                receivedRec13Count.incrementAndGet();
            }
        });
        service.addParameterConfigurer(c -> AbstractService.addConfigurer((Map)c, (String)"myParam", String.class, (TypeTranslator)TypeTranslators.STRING, v -> {
            this.stringParam = v;
        }));
        Assert.assertNotNull((Object)service.getParameterConfigurer("myParam"));
        HashMap<String, String> rcf = new HashMap<String, String>();
        service.getParameterConfigurer("myParam").addValue(rcf, (Object)"VALUE");
        service.reconfigure(rcf);
        Assert.assertEquals((Object)"VALUE", (Object)this.stringParam);
        service.setState(ServiceState.STARTING);
        service.process("S", (Object)"test");
        service.process("S", (Object)"test");
        service.processQuiet("S", (Object)"test");
        rcf.clear();
        rcf.put("myParam", "VALUE-R");
        service.reconfigure(rcf);
        Assert.assertEquals((Object)"VALUE-R", (Object)this.stringParam);
        Rec13Impl r = new Rec13Impl();
        r.setIntField(10);
        r.setStringField("abba");
        service.process("Rec13", (Object)r);
        TimeUtils.sleep((int)1000);
        Assert.assertTrue((service.getAvgResponseTime() >= 0L ? 1 : 0) != 0);
        service.setState(ServiceState.STOPPING);
        Assert.assertEquals((long)3L, (long)receivedStringCount.get());
        service.activate();
        service.passivate();
    }

    @Test
    public void testAsyncClientServer() throws IOException, ExecutionException {
        ActiveAasBase.NotificationMode mo = ActiveAasBase.setNotificationMode((ActiveAasBase.NotificationMode)ActiveAasBase.NotificationMode.NONE);
        ServerAddress broker = new ServerAddress(Schema.IGNORE);
        this.qpid = TestWithQpid.fromPlugin((ServerAddress)broker);
        this.qpid.start();
        EnvironmentSetup setup = (EnvironmentSetup)EnvironmentSetup.readFromYaml(EnvironmentSetup.class, (InputStream)new FileInputStream("src/test/resources/envSetup.yml"));
        setup.getTransport().setPort(broker.getPort());
        Supplier su = Transport.setTransportSetup(() -> setup.getTransport());
        String transportChannel = "myServer";
        YamlServer yServer = new YamlServer();
        yServer.setId("test-py");
        yServer.setHost("localhost");
        yServer.setPort(NetUtils.getEphemeralPort());
        yServer.setDescription("");
        yServer.setVersion(new Version(new int[]{0, 0, 1}));
        yServer.setStarted(false);
        yServer.setTransportChannel("myServer");
        yServer.setCmdArg(this.composeCmdLineArguments("test-py"));
        yServer.setHomePath("src/main/python");
        CountingAsyncPythonProcessService server = new CountingAsyncPythonProcessService(yServer.toService());
        server.setState(ServiceState.STARTING);
        YamlService yService = new YamlService();
        yService.setDescription("");
        yService.setId("1234");
        yService.setTransportChannel("myServer");
        yService.setKind(ServiceKind.TRANSFORMATION_SERVICE);
        yService.setTopLevel(true);
        yService.setVersion(new Version(new int[]{0, 0, 2}));
        YamlProcess yProcess = new YamlProcess();
        yProcess.setHomePath("src/main/python");
        yProcess.setCmdArg(this.composeCmdLineArguments("1234"));
        yProcess.setStarted(false);
        yService.setProcess(yProcess);
        CountingAsyncPythonProcessService client = new CountingAsyncPythonProcessService(yService);
        client.setState(ServiceState.STARTING);
        TimeUtils.sleep((int)1000);
        Assert.assertEquals((Object)ServiceState.RUNNING, (Object)client.getState());
        Assert.assertEquals((Object)ServiceState.RUNNING, (Object)server.getState());
        TimeUtils.sleep((int)1000);
        server.process("*SERVER", "S-TEST".getBytes());
        TimeUtils.sleep((int)2000);
        client.process("*SERVER", "C-TEST".getBytes());
        TimeUtils.sleep((int)2000);
        Assert.assertEquals((long)1L, (long)client.getCountProcess());
        TimeUtils.sleep((int)1000);
        client.setState(ServiceState.STOPPING);
        server.setState(ServiceState.STOPPING);
        TimeUtils.sleep((int)2000);
        Transport.releaseConnector((boolean)false);
        this.stopBroker();
        ActiveAasBase.setNotificationMode((ActiveAasBase.NotificationMode)mo);
        Transport.setTransportSetup((Supplier)su);
    }

    @After
    public void stopBroker() {
        if (null != this.qpid) {
            this.qpid.stop(true);
            this.qpid = null;
        }
    }

    @Test
    public void testSyncProcessService() throws ExecutionException, IOException {
        YamlService sDesc = new YamlService();
        sDesc.setName("Test");
        sDesc.setVersion(new Version("0.0.1"));
        sDesc.setKind(ServiceKind.TRANSFORMATION_SERVICE);
        sDesc.setId("Test");
        sDesc.setDeployable(true);
        YamlProcess pDesc = new YamlProcess();
        pDesc.setHomePath("src/main/python");
        pDesc.setCmdArg(this.composeCmdLineArguments("1234"));
        sDesc.setProcess(pDesc);
        String stringTypeName = "S";
        String rec13TypeName = "Rec13";
        PythonSyncProcessService service = new PythonSyncProcessService(sDesc);
        service.registerInputTypeTranslator(String.class, "S", TypeTranslators.STRING);
        service.registerOutputTypeTranslator(String.class, "S", TypeTranslators.STRING);
        service.registerInputTypeTranslator(Rec13.class, "Rec13", (TypeTranslator)new Rec13InTranslator());
        service.registerOutputTypeTranslator(Rec13.class, "Rec13", (TypeTranslator)new Rec13OutTranslator());
        service.setState(ServiceState.STARTING);
        Assert.assertEquals((Object)"test", (Object)service.processSync("S", (Object)"test", "S"));
        Assert.assertEquals((Object)"test", (Object)service.processSync("S", (Object)"test", "S"));
        Assert.assertEquals((Object)"test", (Object)service.processSyncQuiet("S", (Object)"test", "S"));
        Rec13Impl r = new Rec13Impl();
        r.setIntField(10);
        r.setStringField("abba");
        Assert.assertEquals((Object)r, (Object)service.processSyncQuiet("Rec13", (Object)r, "Rec13"));
        service.setState(ServiceState.STOPPING);
        service.activate();
        service.passivate();
    }

    private class CountingAsyncPythonProcessService
    extends PythonAsyncProcessService {
        private int countProcess;

        public CountingAsyncPythonProcessService(YamlService yaml) {
            super(yaml);
        }

        public <I> void process(String inType, I data) throws ExecutionException {
            super.process(inType, data);
            ++this.countProcess;
        }

        public int getCountProcess() {
            return this.countProcess;
        }
    }
}

