package de.iip_ecosphere.platform.services.environment;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.iip_ecosphere.platform.services.environment.GenericMultiTypeServiceImpl;
import de.iip_ecosphere.platform.support.FileUtils;
import de.iip_ecosphere.platform.support.PythonUtils;
import de.iip_ecosphere.platform.support.ServerAddress;
import de.iip_ecosphere.platform.support.setup.InstalledDependenciesSetup;
import de.iip_ecosphere.platform.transport.Transport;
import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.TransportConnector;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslator;
import de.iip_ecosphere.platform.transport.serialization.TypeTranslators;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.apache.commons.text.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:de/iip_ecosphere/platform/services/environment/AbstractPythonProcessService.class */
public abstract class AbstractPythonProcessService extends AbstractRunnablesService implements GenericMultiTypeService {
    public static final char TYPE_SEPARATOR_CHAR = '|';
    private File home;
    private List<String> pythonArgs;
    private String locationKey;
    private String transportChannel;
    private GenericMultiTypeServiceImpl impl;
    private Map<String, ParameterConfigurer<?>> paramConfigurers;
    private Map<String, ReceptionCallback<?>> callbacks;
    private String averageResponseTime;
    private boolean enableFileDeletion;

    /* loaded from: input_file:de/iip_ecosphere/platform/services/environment/AbstractPythonProcessService$ByteArrayReceptionCallback.class */
    private static abstract class ByteArrayReceptionCallback implements ReceptionCallback<byte[]> {
        private ByteArrayReceptionCallback(String str, Map<String, ReceptionCallback<?>> map) {
            map.put(str, this);
        }

        public Class<byte[]> getType() {
            return byte[].class;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:de/iip_ecosphere/platform/services/environment/AbstractPythonProcessService$InputHandler.class */
    public interface InputHandler {
        boolean handle(String str, String str2) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:de/iip_ecosphere/platform/services/environment/AbstractPythonProcessService$SyncDataIngestor.class */
    public static class SyncDataIngestor<D> implements DataIngestor<D> {
        private BlockingQueue<D> received = new LinkedBlockingQueue();

        @Override // de.iip_ecosphere.platform.services.environment.DataIngestor
        public void ingest(D d) {
            this.received.offer(d);
        }

        @Override // de.iip_ecosphere.platform.services.environment.DataIngestor
        public D waitForResult() throws ExecutionException {
            try {
                return this.received.take();
            } catch (InterruptedException e) {
                throw new ExecutionException(e);
            }
        }
    }

    public AbstractPythonProcessService(String str, InputStream inputStream) {
        super(str, inputStream);
        this.impl = new GenericMultiTypeServiceImpl();
        this.paramConfigurers = new HashMap();
        this.callbacks = new HashMap();
        this.averageResponseTime = "";
    }

    public AbstractPythonProcessService(YamlService yamlService) {
        super(yamlService);
        this.impl = new GenericMultiTypeServiceImpl();
        this.paramConfigurers = new HashMap();
        this.callbacks = new HashMap();
        this.averageResponseTime = "";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // de.iip_ecosphere.platform.services.environment.AbstractService
    public void initializeFrom(YamlService yamlService) {
        YamlProcess process = yamlService.getProcess();
        this.pythonArgs = new ArrayList();
        if (process != null) {
            this.locationKey = process.getLocationKey();
            this.home = process.getHomePath();
            this.pythonArgs.addAll(process.getExecArg());
            this.pythonArgs.add(getPythonModule(process.getExecutable(), yamlService, this.home));
            List<String> substCmdArg = process.getSubstCmdArg();
            if (null != substCmdArg) {
                this.pythonArgs.addAll(substCmdArg);
            }
        } else {
            this.pythonArgs.add(getPythonModule(null, yamlService, null));
        }
        if (null == this.home) {
            getLogger().warn("No home path given for service " + yamlService.getId() + ". Falling back to temporary folder");
            this.home = FileUtils.createTmpFolder(FileUtils.sanitizeFileName(yamlService.getId(), true));
        }
        this.transportChannel = yamlService.getTransportChannel();
        customizePythonArgs(this.pythonArgs);
    }

    protected void customizePythonArgs(List<String> list) {
    }

    public void enableFileDeletion(boolean z) {
        this.enableFileDeletion = z;
    }

    public boolean isFileDeletionEnabled() {
        return this.enableFileDeletion;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public File getHome() {
        return this.home;
    }

    private static String getPythonModule(String str, YamlService yamlService, File file) {
        String str2 = str;
        if ((null == str2 || str2.length() == 0) && file != null) {
            str2 = "ServiceEnvironment.py";
            if (new File(file, "iip/ServiceEnvironment.py").exists()) {
                str2 = "iip/ServiceEnvironment.py";
            }
        }
        return str2;
    }

    protected boolean startExecutableByName() {
        return false;
    }

    protected String scanInputStream(Process process, InputHandler inputHandler) {
        Scanner scanner = new Scanner(process.getInputStream());
        while (scanner.hasNextLine()) {
            String nextLine = scanner.nextLine();
            int indexOf = nextLine.indexOf(TYPE_SEPARATOR_CHAR);
            if (indexOf <= 0 || indexOf >= nextLine.length()) {
                getLogger().error("No type name in result {}", nextLine);
            } else {
                String substring = nextLine.substring(0, indexOf);
                String substring2 = nextLine.substring(indexOf + 1);
                int indexOf2 = substring2.indexOf(TYPE_SEPARATOR_CHAR);
                this.averageResponseTime = substring2.substring(0, indexOf2);
                try {
                    if (inputHandler.handle(substring, substring2.substring(indexOf2 + 1))) {
                        break;
                    }
                } catch (IOException e) {
                    getLogger().error("Error processing {}: {}", nextLine, e.getMessage());
                }
            }
        }
        scanner.close();
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Thread createScanInputThread(final Process process, final InputHandler inputHandler) {
        return new Thread(new Runnable() { // from class: de.iip_ecosphere.platform.services.environment.AbstractPythonProcessService.1
            @Override // java.lang.Runnable
            public void run() {
                AbstractPythonProcessService.this.scanInputStream(process, inputHandler);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <O> void handleResult(Class<O> cls, String str, String str2) {
        try {
            GenericMultiTypeServiceImpl.OutTypeInfo<?> outTypeInfo = getOutTypeInfo(str2);
            if (null != outTypeInfo) {
                TypeTranslator<String, ?> outTranslator = outTypeInfo.getOutTranslator();
                if (outTranslator != null) {
                    outTypeInfo.validateAndGetIngestor(str2).ingest(outTranslator.to(str));
                } else {
                    getLogger().error("No result type translator registered for: {}", str2);
                }
            }
        } catch (IOException e) {
            getLogger().error("Receiving result: {}", e.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String toJson(Map<String, String> map) {
        try {
            return new ObjectMapper().writeValueAsString(map);
        } catch (JsonProcessingException e) {
            getLogger().error("Translating " + map + " to JSON failed: " + e.getMessage());
            return "{}";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Process createAndCustomizeProcess(String str, Map<String, String> map) throws ExecutionException {
        try {
            ArrayList arrayList = new ArrayList();
            if (null != this.pythonArgs) {
                arrayList.addAll(this.pythonArgs);
            }
            ServerAddress netMgtKeyAddress = getNetMgtKeyAddress();
            if (null != netMgtKeyAddress) {
                arrayList.add("--netMgtKeyAddress");
                arrayList.add(netMgtKeyAddress.getHost() + ":" + netMgtKeyAddress.getPort());
            }
            if (null != map && map.size() > 0) {
                arrayList.add("--configure");
                arrayList.add(toJson(map));
            }
            if (null != str) {
                arrayList.add("--data");
                arrayList.add(StringEscapeUtils.escapeJava(str));
            }
            File pythonExecutable = getPythonExecutable();
            if ("conda".equals(pythonExecutable.getName())) {
                boolean z = false;
                int i = -1;
                for (int i2 = 0; i2 < arrayList.size(); i2++) {
                    String str2 = (String) arrayList.get(i2);
                    z |= str2.equals("run");
                    if (str2.equals("-n") && i2 + 1 < arrayList.size()) {
                        i = i2 + 1;
                    }
                }
                if (z && i > 0) {
                    String str3 = (String) arrayList.get(i);
                    arrayList.set(i, InstalledDependenciesSetup.getInstance().getEnvironmentMapping(str3, str3));
                }
            }
            Process createProcess = AbstractProcessService.createProcess(pythonExecutable, startExecutableByName(), this.home, arrayList);
            handleErrorStream(createProcess.getErrorStream());
            return createProcess;
        } catch (IOException e) {
            throw new ExecutionException(e);
        }
    }

    protected String getLocationKey() {
        return this.locationKey;
    }

    protected File getPythonExecutable() {
        File file = null;
        String locationKey = getLocationKey();
        if (null != locationKey) {
            file = InstalledDependenciesSetup.getInstance().getLocation(locationKey);
        }
        if (null == file) {
            file = PythonUtils.getPythonExecutable();
        }
        return file;
    }

    protected void handleErrorStream(InputStream inputStream) {
        register(AbstractProcessService.redirectIO(inputStream, System.err));
    }

    protected static Logger getLogger() {
        return LoggerFactory.getLogger(AbstractPythonProcessService.class);
    }

    @Override // de.iip_ecosphere.platform.services.environment.GenericMultiTypeService
    public <I> void registerInputTypeTranslator(Class<I> cls, String str, TypeTranslator<I, String> typeTranslator) {
        this.impl.registerInputTypeTranslator(cls, str, typeTranslator);
    }

    public GenericMultiTypeServiceImpl.InTypeInfo<?> getInTypeInfo(String str) {
        return this.impl.getInTypeInfo(str);
    }

    public GenericMultiTypeServiceImpl.OutTypeInfo<?> getOutTypeInfo(String str) {
        return this.impl.getOutTypeInfo(str);
    }

    @Override // de.iip_ecosphere.platform.services.environment.GenericMultiTypeService
    public <O> void registerOutputTypeTranslator(Class<O> cls, String str, TypeTranslator<String, O> typeTranslator) {
        this.impl.registerOutputTypeTranslator(cls, str, typeTranslator);
    }

    @Override // de.iip_ecosphere.platform.services.environment.GenericMultiTypeService
    public <O> void attachIngestor(Class<O> cls, String str, DataIngestor<O> dataIngestor) {
        this.impl.attachIngestor(cls, str, dataIngestor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String compose(String str, String str2) {
        return str + "|" + str2;
    }

    @Override // de.iip_ecosphere.platform.services.environment.Service, de.iip_ecosphere.platform.services.environment.ParameterConfigurerProvider
    public ParameterConfigurer<?> getParameterConfigurer(String str) {
        return this.paramConfigurers.get(str);
    }

    @Override // de.iip_ecosphere.platform.services.environment.Service, de.iip_ecosphere.platform.services.environment.ParameterConfigurerProvider
    public Set<String> getParameterNames() {
        return this.paramConfigurers.keySet();
    }

    public void addParameterConfigurer(Consumer<Map<String, ParameterConfigurer<?>>> consumer) {
        consumer.accept(this.paramConfigurers);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // de.iip_ecosphere.platform.services.environment.AbstractService
    public ServiceState start() throws ExecutionException {
        if (null != this.transportChannel && this.transportChannel.length() > 0) {
            try {
                getLogger().info("Establishing clientserver channel for {}, {}: {} ", new Object[]{getId(), getKind(), this.transportChannel});
                if (ServiceKind.SERVER == getKind()) {
                    establishServerListener("*SERVER", this.transportChannel);
                } else {
                    establishClientListener("*SERVER", this.transportChannel);
                }
            } catch (IOException e) {
                getLogger().error("While establishing client-server channel for {}, {}: {} ", new Object[]{getId(), getKind(), e.getMessage()});
            }
        }
        return super.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // de.iip_ecosphere.platform.services.environment.AbstractRunnablesService, de.iip_ecosphere.platform.services.environment.AbstractService
    public ServiceState stop() {
        if (!this.callbacks.isEmpty()) {
            TransportConnector connector = Transport.getConnector();
            if (null != connector) {
                for (Map.Entry<String, ReceptionCallback<?>> entry : this.callbacks.entrySet()) {
                    try {
                        connector.detachReceptionCallback(entry.getKey(), entry.getValue());
                    } catch (IOException e) {
                    }
                }
            }
            this.callbacks.clear();
        }
        return super.stop();
    }

    private void establishServerListener(final String str, String str2) throws IOException {
        final TransportConnector createConnector = Transport.createConnector();
        if (null == createConnector) {
            return;
        }
        registerInputTypeTranslator(byte[].class, str, TypeTranslators.BYTEARRAY_TO_BASE64);
        registerOutputTypeTranslator(byte[].class, str, TypeTranslators.BASE64_TO_BYTEARRAY);
        createConnector.setReceptionCallback(str2, new ByteArrayReceptionCallback(str2, this.callbacks) { // from class: de.iip_ecosphere.platform.services.environment.AbstractPythonProcessService.2
            public void received(byte[] bArr) {
                String str3 = new String(bArr);
                final String str4 = str3 + "_" + System.currentTimeMillis();
                AbstractPythonProcessService abstractPythonProcessService = AbstractPythonProcessService.this;
                String str5 = str;
                TransportConnector transportConnector = createConnector;
                abstractPythonProcessService.attachIngestor(byte[].class, str5, bArr2 -> {
                    try {
                        transportConnector.asyncSend(str3, bArr);
                    } catch (IOException e) {
                        AbstractPythonProcessService.getLogger().error("While receiving from Python and passing on to {}", str3, e.getMessage());
                    }
                });
                try {
                    createConnector.setReceptionCallback(str4, new ByteArrayReceptionCallback(str4, AbstractPythonProcessService.this.callbacks) { // from class: de.iip_ecosphere.platform.services.environment.AbstractPythonProcessService.2.1
                        public void received(byte[] bArr3) {
                            try {
                                AbstractPythonProcessService.this.process(str, bArr3);
                            } catch (ExecutionException e) {
                                AbstractPythonProcessService.getLogger().error("While receiving on {} and passing on to Python: {}", str4, e.getMessage());
                            }
                        }
                    });
                    createConnector.asyncSend(str3, str4);
                } catch (IOException e) {
                    AbstractPythonProcessService.getLogger().error("While setting up server-client-connection {}-{}", new Object[]{str3, str4, e.getMessage()});
                }
            }
        });
    }

    private void establishClientListener(final String str, String str2) throws IOException {
        final TransportConnector createConnector = Transport.createConnector();
        if (null == createConnector) {
            return;
        }
        registerInputTypeTranslator(byte[].class, str, TypeTranslators.BYTEARRAY_TO_BASE64);
        registerOutputTypeTranslator(byte[].class, str, TypeTranslators.BASE64_TO_BYTEARRAY);
        final String str3 = getId() + "_client_" + System.currentTimeMillis();
        createConnector.setReceptionCallback(str3, new ByteArrayReceptionCallback(str3, this.callbacks) { // from class: de.iip_ecosphere.platform.services.environment.AbstractPythonProcessService.3
            private boolean firstReception = true;

            public void received(byte[] bArr) {
                if (!this.firstReception) {
                    try {
                        AbstractPythonProcessService.this.process(str, bArr);
                        return;
                    } catch (ExecutionException e) {
                        AbstractPythonProcessService.getLogger().error("While receiving on {} and passing on to Python: {}", str3, e.getMessage());
                        return;
                    }
                }
                this.firstReception = false;
                String str4 = new String(bArr);
                AbstractPythonProcessService abstractPythonProcessService = AbstractPythonProcessService.this;
                String str5 = str;
                TransportConnector transportConnector = createConnector;
                abstractPythonProcessService.attachIngestor(byte[].class, str5, bArr2 -> {
                    try {
                        transportConnector.asyncSend(str4, bArr2);
                    } catch (IOException e2) {
                        AbstractPythonProcessService.getLogger().error("While receiving from Python passing on to {}", str4, e2.getMessage());
                    }
                });
            }
        });
        createConnector.asyncSend(str2, str3);
    }

    public long getAvgResponseTime() {
        try {
            return Long.parseLong(this.averageResponseTime);
        } catch (NumberFormatException e) {
            return 0L;
        }
    }
}
