package eu.qualimaster.dataManagement.sinks.replay;

import eu.qualimaster.dataManagement.DataManager;
import eu.qualimaster.dataManagement.common.replay.ReplayUtils;
import eu.qualimaster.dataManagement.common.replay.Tuple;
import eu.qualimaster.dataManagement.serialization.ISerializer;
import eu.qualimaster.dataManagement.serialization.SerializerRegistry;
import eu.qualimaster.dataManagement.strategies.IStorageStrategyDescriptor;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.Result;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/qualimaster/dataManagement/sinks/replay/ReplayStreamer.class */
public class ReplayStreamer<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ReplayStreamer.class);
    private Date startDate;
    private Date endDate;
    private String query;
    private String prefix;
    private static final int TIMEOUT = 50;
    private ISerializer<T> serializer;
    private ReplayDataInput resultWrapper;
    private ReplayAggregator aggregator;
    private float speedFactor = -1.0f;
    private volatile boolean stopFetchingDataThread = false;
    private int counter = 0;
    private LinkedBlockingQueue<T> buffer = new LinkedBlockingQueue<>(100);
    private ExecutorService fetcherThread = Executors.newSingleThreadExecutor();

    /* loaded from: input_file:eu/qualimaster/dataManagement/sinks/replay/ReplayStreamer$DataFetcher.class */
    private final class DataFetcher implements Runnable {
        private DataFetcher() {
        }

        @Override // java.lang.Runnable
        public void run() {
            ReplayStreamer.LOG.info("Start Runnable of DataFetcher");
            loop0: while (true) {
                try {
                    if (ReplayStreamer.this.stopFetchingDataThread) {
                        ReplayStreamer.LOG.info("The parameters are probably being updated. Wait 10 ms");
                        Thread.sleep(10L);
                    }
                    while (!ReplayStreamer.this.resultWrapper.isEOD()) {
                        try {
                            try {
                                ReplayStreamer.LOG.info("Received 1 data item. Will be serialized using " + ReplayStreamer.this.serializer);
                                Object deserializeFrom = ReplayStreamer.this.serializer.deserializeFrom(ReplayStreamer.this.resultWrapper);
                                ReplayStreamer.LOG.info("Serializer passed");
                                if (deserializeFrom != null) {
                                    ReplayStreamer.LOG.info("Serialized 1 data item: " + deserializeFrom.toString());
                                    ReplayStreamer.this.buffer.put(deserializeFrom);
                                } else {
                                    ReplayStreamer.LOG.info("Cannot serialize data");
                                }
                            } catch (IOException e) {
                                ReplayStreamer.LOG.error("Error getting data from HBase for the query " + ReplayStreamer.this.query, e);
                            }
                        } catch (Exception e2) {
                            ReplayStreamer.LOG.error("ERROR: ", e2);
                            if (e2 instanceof InterruptedException) {
                                throw new RuntimeException(e2);
                            }
                        }
                    }
                    if (ReplayStreamer.this.resultWrapper.isEOD()) {
                        Thread.sleep(100L);
                    } else {
                        ReplayStreamer.LOG.info("Wait 10 misecs before the next request to the result wrapper");
                        Thread.sleep(10L);
                    }
                } catch (InterruptedException e3) {
                    ReplayStreamer.LOG.warn("The deading-data thread is interrupted or closed");
                    throw new RuntimeException(e3);
                }
            }
            throw new RuntimeException(e2);
        }
    }

    /* loaded from: input_file:eu/qualimaster/dataManagement/sinks/replay/ReplayStreamer$LastItemAggregator.class */
    private class LastItemAggregator implements ReplayAggregator {
        private long prevTimestamp;

        private LastItemAggregator() {
            this.prevTimestamp = 0L;
        }

        @Override // eu.qualimaster.dataManagement.sinks.replay.ReplayAggregator
        public Result aggregate(String[] strArr, Result result) {
            Result result2 = null;
            long timestampFromResult = ReplayUtils.getTimestampFromResult(strArr);
            if (this.prevTimestamp == 0) {
                result2 = result;
            } else {
                long j = (timestampFromResult - this.prevTimestamp) / 1000;
                if (j < 0) {
                    ReplayStreamer.LOG.warn("The message are not come in order: " + timestampFromResult + " < " + this.prevTimestamp);
                }
                if (((float) j) >= ReplayStreamer.this.speedFactor) {
                    result2 = result;
                }
            }
            this.prevTimestamp = timestampFromResult;
            return result2;
        }
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [eu.qualimaster.dataManagement.storage.AbstractStorageTable] */
    public ReplayStreamer(Class<T> cls, Tuple tuple, String str, IStorageStrategyDescriptor iStorageStrategyDescriptor) {
        this.resultWrapper = new ReplayDataInput(tuple, DataManager.REPLAY_STORAGE_MANAGER.getTable(str, tuple.getName(), iStorageStrategyDescriptor).getStorageSupport());
        this.serializer = SerializerRegistry.getSerializer(cls.getSimpleName(), cls);
        this.fetcherThread.submit(new DataFetcher());
        this.aggregator = new LastItemAggregator();
    }

    public void setSpeed(float f) {
        this.stopFetchingDataThread = true;
        this.buffer.clear();
        this.speedFactor = f;
        this.stopFetchingDataThread = false;
    }

    public void setStart(Date date) {
        this.stopFetchingDataThread = true;
        this.buffer.clear();
        this.startDate = date;
        updateQuery();
        this.stopFetchingDataThread = false;
    }

    public void setEnd(Date date) {
        this.stopFetchingDataThread = true;
        this.buffer.clear();
        this.endDate = date;
        updateQuery();
        this.stopFetchingDataThread = false;
    }

    public void setQuery(String str) {
        this.stopFetchingDataThread = true;
        this.buffer.clear();
        this.query = str;
        updateQuery();
        this.stopFetchingDataThread = false;
    }

    public void setSerializer(ISerializer iSerializer) {
        this.serializer = iSerializer;
    }

    public T getData() {
        T t = null;
        while (this.stopFetchingDataThread && this.counter < TIMEOUT) {
            try {
                this.counter++;
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                LOG.error("Reading-data thread is interrupted while fetching from buffer (query " + this.query + "). Return null");
                return null;
            }
        }
        if (this.counter < TIMEOUT) {
            LOG.info("Connect buffer to check for available data");
            t = this.buffer.poll(10L, TimeUnit.SECONDS);
            if (t != null) {
                LOG.info("Get available data from the buffer: " + t.getClass().getName());
            } else {
                LOG.info("No available data from the buffer");
            }
        } else {
            LOG.info("Timeout querying database.. Reset clock");
            this.counter = 0;
        }
        return t;
    }

    private void updateQuery() {
        if (this.startDate == null || this.endDate == null || this.query == null || this.query.isEmpty() || this.speedFactor == -1.0f) {
            LOG.warn("The streamer is not ready. Set the parameters first !!");
        } else {
            this.resultWrapper.updateQuery(this.query, this.startDate, this.endDate, this.aggregator);
        }
    }

    public boolean isEOD() {
        return this.buffer.isEmpty() || this.resultWrapper.isEOD();
    }

    public void close() throws IOException {
        this.resultWrapper.close();
        this.stopFetchingDataThread = true;
        try {
            try {
                LOG.info("attempt to shutdown the data fetching");
                this.fetcherThread.shutdown();
                this.fetcherThread.awaitTermination(5L, TimeUnit.SECONDS);
                if (!this.fetcherThread.isTerminated()) {
                    LOG.warn("cancel non-finished tasks");
                }
                this.fetcherThread.shutdownNow();
                LOG.info("shutdown finished");
            } catch (InterruptedException e) {
                LOG.error("tasks interrupted", e);
                if (!this.fetcherThread.isTerminated()) {
                    LOG.warn("cancel non-finished tasks");
                }
                this.fetcherThread.shutdownNow();
                LOG.info("shutdown finished");
            }
        } catch (Throwable th) {
            if (!this.fetcherThread.isTerminated()) {
                LOG.warn("cancel non-finished tasks");
            }
            this.fetcherThread.shutdownNow();
            LOG.info("shutdown finished");
            throw th;
        }
    }
}
