/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.services.environment;

import de.iip_ecosphere.platform.services.environment.AbstractProcessService;
import de.iip_ecosphere.platform.services.environment.AbstractRunnablesService;
import de.iip_ecosphere.platform.services.environment.DataIngestor;
import de.iip_ecosphere.platform.services.environment.GenericMultiTypeService;
import de.iip_ecosphere.platform.services.environment.GenericMultiTypeServiceImpl;
import de.iip_ecosphere.platform.services.environment.ParameterConfigurer;
import de.iip_ecosphere.platform.services.environment.ServiceKind;
import de.iip_ecosphere.platform.services.environment.ServiceState;
import de.iip_ecosphere.platform.services.environment.YamlProcess;
import de.iip_ecosphere.platform.services.environment.YamlService;
import de.iip_ecosphere.platform.support.FileUtils;
import de.iip_ecosphere.platform.support.ServerAddress;
import de.iip_ecosphere.platform.support.StringUtils;
import de.iip_ecosphere.platform.support.json.Json;
import de.iip_ecosphere.platform.support.logging.Logger;
import de.iip_ecosphere.platform.support.logging.LoggerFactory;
import de.iip_ecosphere.platform.support.setup.InstalledDependenciesSetup;
import de.iip_ecosphere.platform.transport.Transport;
import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.TransportConnector;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslator;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslators;
import de.oktoflow.platform.tools.lib.PythonUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public abstract class AbstractPythonProcessService
extends AbstractRunnablesService
implements GenericMultiTypeService {
    public static final char TYPE_SEPARATOR_CHAR = '|';
    private File home;
    private List<String> pythonArgs;
    private String locationKey;
    private String transportChannel;
    private GenericMultiTypeServiceImpl impl = new GenericMultiTypeServiceImpl();
    private Map<String, ParameterConfigurer<?>> paramConfigurers = new HashMap();
    private Map<String, ReceptionCallback<?>> callbacks = new HashMap();
    private String averageResponseTime = "";
    private boolean enableFileDeletion;

    public AbstractPythonProcessService(String serviceId, InputStream ymlFile) {
        super(serviceId, ymlFile);
    }

    public AbstractPythonProcessService(YamlService yaml) {
        super(yaml);
    }

    @Override
    protected void initializeFrom(YamlService yaml) {
        YamlProcess pSpec = yaml.getProcess();
        this.pythonArgs = new ArrayList<String>();
        if (pSpec != null) {
            this.locationKey = pSpec.getLocationKey();
            this.home = pSpec.getHomePath();
            this.pythonArgs.addAll(pSpec.getExecArg());
            this.pythonArgs.add(AbstractPythonProcessService.getPythonModule(pSpec.getExecutable(), yaml, this.home));
            List<String> cmdArg = pSpec.getSubstCmdArg();
            if (null != cmdArg) {
                this.pythonArgs.addAll(cmdArg);
            }
        } else {
            this.pythonArgs.add(AbstractPythonProcessService.getPythonModule(null, yaml, null));
        }
        if (null == this.home) {
            AbstractPythonProcessService.getLogger().warn("No home path given for service " + yaml.getId() + ". Falling back to temporary folder");
            this.home = FileUtils.createTmpFolder((String)FileUtils.sanitizeFileName((String)yaml.getId(), (boolean)true));
        }
        this.transportChannel = yaml.getTransportChannel();
        this.customizePythonArgs(this.pythonArgs);
    }

    protected boolean unbuffer() {
        return true;
    }

    protected void customizePythonArgs(List<String> pythonArgs) {
    }

    public void enableFileDeletion(boolean enableFileDeletion) {
        this.enableFileDeletion = enableFileDeletion;
    }

    public boolean isFileDeletionEnabled() {
        return this.enableFileDeletion;
    }

    protected File getHome() {
        return this.home;
    }

    private static String getPythonModule(String module, YamlService yaml, File homePath) {
        String result = module;
        if ((null == result || result.length() == 0) && homePath != null) {
            result = "ServiceEnvironment.py";
            File f = new File(homePath, "iip/ServiceEnvironment.py");
            if (f.exists()) {
                result = "iip/ServiceEnvironment.py";
            }
        }
        return result;
    }

    protected boolean startExecutableByName() {
        return false;
    }

    protected String scanInputStream(Process proc, InputHandler handler) {
        String result = null;
        Scanner sc = new Scanner(proc.getInputStream());
        while (sc.hasNextLine()) {
            String line = sc.nextLine();
            int pos = line.indexOf(124);
            if (pos > 0 && pos < line.length()) {
                String typeName = line.substring(0, pos);
                String data = line.substring(pos + 1);
                pos = data.indexOf(124);
                this.averageResponseTime = data.substring(0, pos);
                data = data.substring(pos + 1);
                try {
                    if (!handler.handle(typeName, data)) continue;
                    break;
                }
                catch (IOException e) {
                    AbstractPythonProcessService.getLogger().error("Error processing {}: {}", (Object)line, (Object)e.getMessage());
                    continue;
                }
            }
            AbstractPythonProcessService.getLogger().error("No type name in result {}", (Object)line);
        }
        sc.close();
        return result;
    }

    protected Thread createScanInputThread(final Process proc, final InputHandler handler) {
        Thread result = new Thread(new Runnable(){

            @Override
            public void run() {
                AbstractPythonProcessService.this.scanInputStream(proc, handler);
            }
        });
        return result;
    }

    protected <O> void handleResult(Class<O> cls, String data, String typeName) {
        try {
            GenericMultiTypeServiceImpl.OutTypeInfo<?> info = this.getOutTypeInfo(typeName);
            if (null != info) {
                TypeTranslator<String, ?> outT = info.getOutTranslator();
                if (outT != null) {
                    Object tmp = outT.to((Object)data);
                    DataIngestor<?> ingestor = info.validateAndGetIngestor(typeName);
                    ingestor.ingest(tmp);
                } else {
                    AbstractPythonProcessService.getLogger().error("No result type translator registered for: {}", (Object)typeName);
                }
            }
        }
        catch (IOException e) {
            AbstractPythonProcessService.getLogger().error("Receiving result: {}", (Object)e.getMessage());
        }
    }

    protected String toJson(Map<String, String> reconfValues) {
        try {
            return Json.writeValueAsStringDflt(reconfValues);
        }
        catch (IOException e) {
            AbstractPythonProcessService.getLogger().error("Translating " + String.valueOf(reconfValues) + " to JSON failed: " + e.getMessage());
            return "{}";
        }
    }

    private static String toPythonDataCls(Class<?> cls) {
        return cls.getSimpleName();
    }

    protected String getTypeSubstitutionsJson() {
        try {
            Map<String, String> tmp = this.getTypeSubstitutions().entrySet().stream().collect(Collectors.toMap(e -> AbstractPythonProcessService.toPythonDataCls((Class)e.getKey()), e -> AbstractPythonProcessService.toPythonDataCls((Class)e.getValue())));
            return Json.writeValueAsStringDflt(tmp);
        }
        catch (IOException e2) {
            AbstractPythonProcessService.getLogger().error("Translating type mappings to JSON failed: " + e2.getMessage());
            return "{}";
        }
    }

    protected Process createAndCustomizeProcess(String data, Map<String, String> reconfValues) throws ExecutionException {
        try {
            ServerAddress netMgtKeyAdr;
            ArrayList<String> args = new ArrayList<String>();
            if (null != this.pythonArgs) {
                args.addAll(this.pythonArgs);
            }
            if (null != (netMgtKeyAdr = this.getNetMgtKeyAddress())) {
                args.add("--netMgtKeyAddress");
                args.add(netMgtKeyAdr.getHost() + ":" + netMgtKeyAdr.getPort());
            }
            if (null != reconfValues && reconfValues.size() > 0) {
                args.add("--configure");
                args.add(this.toJson(reconfValues));
            }
            if (null != data) {
                args.add("--data");
                args.add(StringUtils.escapeJava((String)data));
            }
            args.add("--subst");
            args.add(this.getTypeSubstitutionsJson());
            File pyExec = this.getPythonExecutable();
            int unbufferPos = 0;
            if ("conda".equals(pyExec.getName())) {
                File conda = InstalledDependenciesSetup.getInstance().getLocation("CONDA");
                if (null == conda) {
                    conda = new File("conda");
                }
                boolean foundRun = false;
                int envNameIndex = -1;
                for (int i = 0; i < args.size(); ++i) {
                    String a = (String)args.get(i);
                    foundRun |= a.equals("run");
                    if (!a.equals("-n") || i + 1 >= args.size()) continue;
                    envNameIndex = i + 1;
                }
                if (foundRun && envNameIndex > 0) {
                    String env = (String)args.get(envNameIndex);
                    args.set(envNameIndex, InstalledDependenciesSetup.getInstance().getEnvironmentMapping(this.getLocationKey(), env));
                    args.add(envNameIndex + 1, "python");
                    unbufferPos = envNameIndex + 2;
                } else {
                    unbufferPos = 1;
                }
            }
            if (this.unbuffer()) {
                args.add(unbufferPos, "-u");
            }
            Process proc = AbstractProcessService.createProcess(pyExec, this.startExecutableByName(), this.home, args);
            this.handleErrorStream(proc.getErrorStream());
            return proc;
        }
        catch (IOException e) {
            throw new ExecutionException(e);
        }
    }

    protected String getLocationKey() {
        return this.locationKey;
    }

    protected File getPythonExecutable() {
        File result = null;
        String key = this.getLocationKey();
        if (null != key) {
            result = InstalledDependenciesSetup.getInstance().getLocation(key);
        }
        if (null == result) {
            result = PythonUtils.getPythonExecutable();
        }
        return result;
    }

    protected void handleErrorStream(InputStream err) {
        this.register(AbstractProcessService.redirectIO(err, System.err, l -> {
            boolean done = false;
            if (!l.contains("[Error]")) {
                System.out.println((String)l);
                done = true;
            }
            return done;
        }));
    }

    protected static Logger getLogger() {
        return LoggerFactory.getLogger(AbstractPythonProcessService.class);
    }

    @Override
    public <I> void registerInputTypeTranslator(Class<I> inCls, String inTypeName, TypeTranslator<I, String> inTrans) {
        this.impl.registerInputTypeTranslator(inCls, inTypeName, inTrans);
    }

    public GenericMultiTypeServiceImpl.InTypeInfo<?> getInTypeInfo(String inTypeName) {
        return this.impl.getInTypeInfo(inTypeName);
    }

    @Override
    public void registerInOutRelation(String inTypeName, String outTypeName) {
        this.impl.registerInOutRelation(inTypeName, outTypeName);
    }

    @Override
    public String getOutTypeName(String inTypeName) {
        return this.impl.getOutTypeName(inTypeName);
    }

    public GenericMultiTypeServiceImpl.OutTypeInfo<?> getOutTypeInfo(String outTypeName) {
        return this.impl.getOutTypeInfo(outTypeName);
    }

    @Override
    public <O> void registerOutputTypeTranslator(Class<O> outCls, String outTypeName, TypeTranslator<String, O> outTrans) {
        this.impl.registerOutputTypeTranslator(outCls, outTypeName, outTrans);
    }

    @Override
    public <O> void attachIngestor(Class<O> outCls, String outTypeName, DataIngestor<O> ingestor) {
        this.impl.attachIngestor(outCls, outTypeName, ingestor);
    }

    protected static String compose(String typeName, String data) {
        return typeName + "|" + data;
    }

    @Override
    public ParameterConfigurer<?> getParameterConfigurer(String paramName) {
        return this.paramConfigurers.get(paramName);
    }

    @Override
    public Set<String> getParameterNames() {
        return this.paramConfigurers.keySet();
    }

    public void addParameterConfigurer(Consumer<Map<String, ParameterConfigurer<?>>> paramConsumer) {
        paramConsumer.accept(this.paramConfigurers);
    }

    @Override
    protected ServiceState start() throws ExecutionException {
        if (null != this.transportChannel && this.transportChannel.length() > 0) {
            try {
                AbstractPythonProcessService.getLogger().info("Establishing clientserver channel for {}, {}: {} ", new Object[]{this.getId(), this.getKind(), this.transportChannel});
                if (ServiceKind.SERVER == this.getKind()) {
                    this.establishServerListener("*SERVER", this.transportChannel);
                } else {
                    this.establishClientListener("*SERVER", this.transportChannel);
                }
            }
            catch (IOException e) {
                AbstractPythonProcessService.getLogger().error("While establishing client-server channel for {}, {}: {} ", new Object[]{this.getId(), this.getKind(), e.getMessage()});
            }
        }
        return super.start();
    }

    @Override
    protected ServiceState stop() {
        if (!this.callbacks.isEmpty()) {
            TransportConnector conn = Transport.getConnector();
            if (null != conn) {
                for (Map.Entry<String, ReceptionCallback<?>> c : this.callbacks.entrySet()) {
                    try {
                        conn.detachReceptionCallback(c.getKey(), c.getValue());
                    }
                    catch (IOException iOException) {}
                }
            }
            this.callbacks.clear();
        }
        return super.stop();
    }

    private void establishServerListener(final String typeName, String serverChannel) throws IOException {
        final TransportConnector conn = Transport.createConnector();
        if (null == conn) {
            return;
        }
        this.registerInputTypeTranslator(byte[].class, typeName, TypeTranslators.BYTEARRAY_TO_BASE64);
        this.registerOutputTypeTranslator(byte[].class, typeName, TypeTranslators.BASE64_TO_BYTEARRAY);
        conn.setReceptionCallback(serverChannel, (ReceptionCallback)new ByteArrayReceptionCallback(serverChannel, this.callbacks){

            public void received(byte[] data) {
                String cChannel = new String(data);
                final String cSChannel = cChannel + "_" + System.currentTimeMillis();
                AbstractPythonProcessService.this.attachIngestor(byte[].class, typeName, d -> {
                    try {
                        conn.asyncSend(cChannel, (Object)data);
                    }
                    catch (IOException e) {
                        AbstractPythonProcessService.getLogger().error("While receiving from Python and passing on to {}", (Object)cChannel, (Object)e.getMessage());
                    }
                });
                try {
                    conn.setReceptionCallback(cSChannel, (ReceptionCallback)new ByteArrayReceptionCallback(cSChannel, AbstractPythonProcessService.this.callbacks){

                        public void received(byte[] data) {
                            try {
                                AbstractPythonProcessService.this.process(typeName, data);
                            }
                            catch (ExecutionException e) {
                                AbstractPythonProcessService.getLogger().error("While receiving on {} and passing on to Python: {}", (Object)cSChannel, (Object)e.getMessage());
                            }
                        }
                    });
                    conn.asyncSend(cChannel, (Object)cSChannel);
                }
                catch (IOException e) {
                    AbstractPythonProcessService.getLogger().error("While setting up server-client-connection {}-{}", new Object[]{cChannel, cSChannel, e.getMessage()});
                }
            }
        });
    }

    private void establishClientListener(final String typeName, String serverChannel) throws IOException {
        final TransportConnector conn = Transport.createConnector();
        if (null == conn) {
            return;
        }
        this.registerInputTypeTranslator(byte[].class, typeName, TypeTranslators.BYTEARRAY_TO_BASE64);
        this.registerOutputTypeTranslator(byte[].class, typeName, TypeTranslators.BASE64_TO_BYTEARRAY);
        final String clientChannel = this.getId() + "_client_" + System.currentTimeMillis();
        conn.setReceptionCallback(clientChannel, (ReceptionCallback)new ByteArrayReceptionCallback(clientChannel, this.callbacks){
            private boolean firstReception;
            {
                super(channel, callbacks);
                this.firstReception = true;
            }

            public void received(byte[] data) {
                if (this.firstReception) {
                    this.firstReception = false;
                    String serverChannel = new String(data);
                    AbstractPythonProcessService.this.attachIngestor(byte[].class, typeName, d -> {
                        try {
                            conn.asyncSend(serverChannel, d);
                        }
                        catch (IOException e) {
                            AbstractPythonProcessService.getLogger().error("While receiving from Python passing on to {}", (Object)serverChannel, (Object)e.getMessage());
                        }
                    });
                } else {
                    try {
                        AbstractPythonProcessService.this.process(typeName, data);
                    }
                    catch (ExecutionException e) {
                        AbstractPythonProcessService.getLogger().error("While receiving on {} and passing on to Python: {}", (Object)clientChannel, (Object)e.getMessage());
                    }
                }
            }
        });
        conn.asyncSend(serverChannel, (Object)clientChannel);
    }

    public long getAvgResponseTime() {
        try {
            return Long.parseLong(this.averageResponseTime);
        }
        catch (NumberFormatException e) {
            return 0L;
        }
    }

    protected static interface InputHandler {
        public boolean handle(String var1, String var2) throws IOException;
    }

    private static abstract class ByteArrayReceptionCallback
    implements ReceptionCallback<byte[]> {
        private ByteArrayReceptionCallback(String channel, Map<String, ReceptionCallback<?>> callbacks) {
            callbacks.put(channel, this);
        }

        public Class<byte[]> getType() {
            return byte[].class;
        }
    }

    protected static class SyncDataIngestor<D>
    implements DataIngestor<D> {
        private BlockingQueue<D> received = new LinkedBlockingQueue<D>();

        protected SyncDataIngestor() {
        }

        @Override
        public void ingest(D data) {
            this.received.offer(data);
        }

        @Override
        public D waitForResult() throws ExecutionException {
            try {
                return this.received.take();
            }
            catch (InterruptedException e) {
                throw new ExecutionException(e);
            }
        }
    }
}

