/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.connectors.influxv3;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.query.QueryOptions;
import de.iip_ecosphere.platform.connectors.AbstractPluginConnectorDescriptor;
import de.iip_ecosphere.platform.connectors.AbstractThreadedConnector;
import de.iip_ecosphere.platform.connectors.AdapterSelector;
import de.iip_ecosphere.platform.connectors.Connector;
import de.iip_ecosphere.platform.connectors.ConnectorParameter;
import de.iip_ecosphere.platform.connectors.MachineConnector;
import de.iip_ecosphere.platform.connectors.MachineConnectorSupportedQueries;
import de.iip_ecosphere.platform.connectors.events.ConnectorTriggerQuery;
import de.iip_ecosphere.platform.connectors.events.SimpleTimeseriesQuery;
import de.iip_ecosphere.platform.connectors.events.StringTriggerQuery;
import de.iip_ecosphere.platform.connectors.influxv3.InfluxModelAccess;
import de.iip_ecosphere.platform.connectors.types.ProtocolAdapter;
import de.iip_ecosphere.platform.connectors.types.RecordCompletePredicate;
import de.iip_ecosphere.platform.support.TimeUtils;
import de.iip_ecosphere.platform.support.identities.IdentityToken;
import de.iip_ecosphere.platform.support.logging.Logger;
import de.iip_ecosphere.platform.support.logging.LoggerFactory;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Stream;

@MachineConnector(hasModel=false, supportsModelStructs=false, supportsEvents=false, specificSettings={"DATABASE", "MEASUREMENT", "TAGS", "BATCH", "BASETIME"})
@MachineConnectorSupportedQueries(value={StringTriggerQuery.class, SimpleTimeseriesQuery.class})
public class InfluxConnector<CO, CI>
extends AbstractThreadedConnector<Object, Object, CO, CI, InfluxModelAccess> {
    public static final String NAME = "INFLUX-v3";
    private static final Logger LOGGER = LoggerFactory.getLogger(InfluxConnector.class);
    private static final Object DUMMY = new Object();
    private InfluxDBClient client;
    private ConnectorParameter params;
    private String database;
    private String measurement;
    private Set<String> tags = new HashSet<String>();
    private int batchSize = 1;
    private RecordCompletePredicate recordComplete = RecordCompletePredicate.DEFAULT;
    private long baseTime = -1L;

    @SafeVarargs
    public InfluxConnector(ProtocolAdapter<Object, Object, CO, CI> ... adapter) {
        this((AdapterSelector<Object, Object, CO, CI>)null, adapter);
    }

    @SafeVarargs
    public InfluxConnector(AdapterSelector<Object, Object, CO, CI> selector, ProtocolAdapter<Object, Object, CO, CI> ... adapter) {
        super(selector, adapter);
        this.setModelAccessSupplier(() -> new InfluxModelAccess(this));
    }

    public String getName() {
        return NAME;
    }

    protected void connectImpl(ConnectorParameter params) throws IOException {
        if (this.client == null) {
            String tmp;
            this.params = params;
            String url = String.valueOf(params.getSchema()) + "://" + params.getHost();
            if (params.getPort() >= 0) {
                url = url + ":" + params.getPort();
            }
            if (params.getEndpointPath() != null) {
                url = url + "/" + params.getEndpointPath();
            }
            this.database = params.getSpecificStringSetting("DATABASE");
            this.measurement = params.getSpecificStringSetting("MEASUREMENT");
            Long longTmp = params.getSpecificLongSetting("BASETIME");
            if (longTmp != null) {
                this.baseTime = longTmp;
                if (this.baseTime == 0L) {
                    this.baseTime = System.currentTimeMillis();
                }
            }
            if (null != (tmp = params.getSpecificStringSetting("TAGS"))) {
                Arrays.stream(tmp.split(",")).forEach(t -> this.tags.add((String)t));
            }
            if (null != (tmp = params.getSpecificStringSetting("BATCH"))) {
                try {
                    this.batchSize = Integer.parseInt(tmp.toString());
                }
                catch (NumberFormatException e) {
                    LOGGER.info("Specific setting BATCH ignored: {}", (Object)e.getMessage());
                }
            }
            LOGGER.info("INFLUX connecting to " + url);
            try {
                IdentityToken tok = params.getIdentityToken("");
                if (null != tok) {
                    if (tok.getType() == IdentityToken.TokenType.ISSUED) {
                        this.client = InfluxDBClient.getInstance((String)url, (char[])tok.getTokenDataAsCharArray(), (String)this.database);
                        LOGGER.info("INFLUX connected to {} by token", (Object)url);
                    } else {
                        LOGGER.error("INFLUX connector cannot handle identity token type {}!", (Object)tok.getType());
                    }
                }
                if (null == this.client) {
                    LOGGER.error("INFLUX not connected!");
                }
            }
            catch (Exception e) {
                LOGGER.error("INFLUX connection failed: {}", (Object)e.getMessage());
                LOGGER.debug("INFLUX connection failed", (Throwable)e);
            }
        }
    }

    protected void disconnectImpl() throws IOException {
        if (null != this.client) {
            try {
                this.client.close();
            }
            catch (Exception e) {
                LOGGER.error("Closing INFLUX connector: {} ", (Object)e.getMessage());
            }
            this.client = null;
        }
    }

    protected void installPollTask() {
    }

    protected void error(String message, Throwable th) {
        LOGGER.error(message, th);
    }

    protected Object read() throws IOException {
        return DUMMY;
    }

    private String toTimePart(int time, SimpleTimeseriesQuery.TimeKind kind) {
        Object result = "";
        if (kind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED) {
            switch (kind) {
                case RELATIVE_WEEKS: {
                    result = (String)result + this.relativeTimePart(time, "w");
                    break;
                }
                case RELATIVE_DAYS: {
                    result = (String)result + this.relativeTimePart(time, "d");
                    break;
                }
                case RELATIVE_HOURS: {
                    result = (String)result + this.relativeTimePart(time, "h");
                    break;
                }
                case RELATIVE_MINUTES: {
                    result = (String)result + this.relativeTimePart(time, "m");
                    break;
                }
                case RELATIVE_SECONDS: {
                    result = (String)result + this.relativeTimePart(time, "s");
                    break;
                }
                case RELATIVE_MILLISECONDS: {
                    result = (String)result + this.relativeTimePart(time, "ms");
                    break;
                }
                case RELATIVE_MICROSECONDS: {
                    result = (String)result + this.relativeTimePart(time, "u");
                    break;
                }
                case ABSOLUTE: {
                    result = (String)result + time;
                    break;
                }
            }
        }
        result = (String)result + " ";
        return result;
    }

    private String relativeTimePart(int time, String unit) {
        return "now() - " + time + unit;
    }

    private QueryOptions toQueryOptions(String type) {
        QueryOptions result = null;
        if (type != null && type.equalsIgnoreCase("sql")) {
            result = QueryOptions.defaultQueryOptions();
        }
        if (result == null) {
            result = QueryOptions.defaultInfluxQlQueryOptions();
        }
        return result;
    }

    public void trigger(ConnectorTriggerQuery query) {
        int qDelay;
        Object qString = null;
        String qType = null;
        int n = qDelay = null != query ? query.delay() : 0;
        if (query instanceof StringTriggerQuery) {
            q = (StringTriggerQuery)query;
            qString = q.getQuery();
            qType = q.getType();
        } else if (query instanceof SimpleTimeseriesQuery) {
            q = (SimpleTimeseriesQuery)query;
            qString = "SELECT * ";
            qString = (String)qString + "FROM \"" + this.getDatabase() + "\".\"" + this.getMeasurement() + "\" ";
            SimpleTimeseriesQuery.TimeKind startKind = q.getStartKind();
            SimpleTimeseriesQuery.TimeKind endKind = q.getEndKind();
            if (startKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED || endKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED) {
                qString = (String)qString + "WHERE ";
                if (startKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED) {
                    qString = (String)qString + "time >= " + this.toTimePart(q.getStart(), startKind);
                }
                if (startKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED && endKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED) {
                    qString = (String)qString + "AND ";
                }
                if (endKind != SimpleTimeseriesQuery.TimeKind.UNSPECIFIED) {
                    qString = (String)qString + "time <= " + this.toTimePart(q.getStart(), endKind);
                }
            }
            qString = (String)qString + "ORDER BY time ASC";
        }
        if (qString != null) {
            String queryString = qString;
            String queryType = qType;
            Runnable run = () -> {
                Stream stream = this.client.queryPoints(queryString, this.toQueryOptions(queryType));
                InfluxModelAccess acc = (InfluxModelAccess)this.getModelAccess();
                if (null != acc) {
                    long lastTime = -1L;
                    Instant lastRecordTime = null;
                    HashMap<String, Object> values = new HashMap<String, Object>();
                    stream.forEach(point -> {
                        Number time = point.getTimestamp();
                        values.put("*T*", time);
                        for (String field : point.getFieldNames()) {
                            if (lastRecordTime != null && !lastRecordTime.equals(time) || this.recordComplete.isComplete(values, field)) {
                                this.flush(acc, values);
                            }
                            values.put(field, point.getField(field));
                            long thisTime = time.longValue();
                            int delay = 0;
                            if (qDelay > 0) {
                                delay = qDelay;
                            } else if (lastTime > 0L) {
                                delay = (int)(thisTime - lastTime);
                            }
                            TimeUtils.sleep((int)Math.max(1, delay));
                        }
                    });
                    if (!values.isEmpty()) {
                        this.flush(acc, values);
                    }
                }
            };
            new Thread(run).start();
        }
    }

    private void flush(InfluxModelAccess acc, Map<String, Object> values) {
        try {
            acc.setReadData(values);
            this.received("", DUMMY, true);
            acc.readCompleted();
        }
        catch (IOException e) {
            LoggerFactory.getLogger(((Object)((Object)this)).getClass()).error("Cannot trigger connector {}: {}", (Object)this.getName(), (Object)e.getMessage());
        }
        values.clear();
    }

    protected void writeImpl(Object data) throws IOException {
        InfluxModelAccess acc = (InfluxModelAccess)this.getModelAccess();
        if (null != acc) {
            acc.writeCompleted();
        }
    }

    public String supportedEncryption() {
        return null;
    }

    public String enabledEncryption() {
        return null;
    }

    InfluxDBClient getClient() {
        return this.client;
    }

    ConnectorParameter getParameter() {
        return this.params;
    }

    String getDatabase() {
        return this.database;
    }

    String getMeasurement() {
        return this.measurement;
    }

    Set<String> getTags() {
        return this.tags;
    }

    int getBatchSize() {
        return this.batchSize;
    }

    long toTimestamp(float value) {
        long result = (long)value;
        if (this.baseTime > 0L) {
            result += this.baseTime;
        }
        return result;
    }

    float fromTimestamp(long value) {
        long result = value;
        if (this.baseTime > 0L) {
            result -= this.baseTime;
        }
        return result;
    }

    public static class Descriptor
    extends AbstractPluginConnectorDescriptor<Object, Object> {
        public String getName() {
            return InfluxConnector.NAME;
        }

        public Class<?> getConnectorType() {
            return InfluxConnector.class;
        }

        protected <O, I, CO, CI, S extends AdapterSelector<Object, Object, CO, CI>, A extends ProtocolAdapter<Object, Object, CO, CI>> Connector<Object, Object, CO, CI> createConnectorImpl(S selector, Supplier<ConnectorParameter> params, A ... adapter) {
            return new InfluxConnector(selector, (ProtocolAdapter<Object, Object, CO, CI>[])adapter);
        }

        protected String initId(String id) {
            return "connector-influx-v3";
        }
    }
}

