package eu.qualimaster.dataManagement.sources;

import eu.qualimaster.dataManagement.sources.replay.DateTimeTimestampParser;
import eu.qualimaster.dataManagement.sources.replay.FileSource;
import eu.qualimaster.dataManagement.sources.replay.HdfsSource;
import eu.qualimaster.dataManagement.sources.replay.IDataManipulator;
import eu.qualimaster.dataManagement.sources.replay.IReplaySource;
import eu.qualimaster.dataManagement.sources.replay.ITimestampParser;
import eu.qualimaster.dataManagement.strategies.IStorageStrategyDescriptor;
import eu.qualimaster.observables.IObservable;
import eu.qualimaster.pipeline.DefaultModeException;
import java.io.BufferedReader;
import java.io.IOException;
import java.text.ParseException;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;

/* loaded from: input_file:DataManagementLayer.jar:eu/qualimaster/dataManagement/sources/ReplayMechanism.class */
public class ReplayMechanism implements IDataSource {
    private Logger logger;
    private IStorageStrategyDescriptor strategy;
    private char separator;
    private boolean separatorDefined;
    private IReplaySource source;
    private ITimestampParser timestampParser;
    private IDataManipulator manipulator;
    private BufferedReader brForData;
    private boolean endOfData;
    private long offsetInMillis;
    private long prevTimeStampNow;
    private boolean shallConnect;
    private boolean selfConnect;
    private long monitoringTimestamp;
    private long throughput;
    private int measurementDuration;
    private long start;
    private long record;
    private boolean init;
    private int timeInterval;

    /* loaded from: input_file:DataManagementLayer.jar:eu/qualimaster/dataManagement/sources/ReplayMechanism$ProfilingQueueItem.class */
    public static class ProfilingQueueItem<T> {
        long timestamp;
        T item;

        public ProfilingQueueItem(long j, T t) {
            this.timestamp = j;
            this.item = t;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public T getItem() {
            return this.item;
        }
    }

    public ReplayMechanism() {
        this(null, null, null);
    }

    public ReplayMechanism(IReplaySource iReplaySource) {
        this(iReplaySource, null, null);
    }

    public ReplayMechanism(ITimestampParser iTimestampParser) {
        this(null, null, iTimestampParser);
    }

    public ReplayMechanism(IReplaySource iReplaySource, ITimestampParser iTimestampParser) {
        this(iReplaySource, null, iTimestampParser);
    }

    public ReplayMechanism(IReplaySource iReplaySource, IDataManipulator iDataManipulator, ITimestampParser iTimestampParser) {
        this.logger = Logger.getLogger(ReplayMechanism.class);
        this.separatorDefined = false;
        this.timestampParser = DateTimeTimestampParser.INSTANCE;
        this.endOfData = false;
        this.selfConnect = true;
        this.start = 0L;
        this.record = 0L;
        this.init = false;
        this.timeInterval = 1000;
        this.monitoringTimestamp = 0L;
        this.throughput = 0L;
        this.measurementDuration = 60;
        this.source = iReplaySource;
        if (null != iTimestampParser) {
            this.timestampParser = iTimestampParser;
        }
        this.manipulator = iDataManipulator;
    }

    public void setSource(IReplaySource iReplaySource) {
        if (null != iReplaySource) {
            this.source = iReplaySource;
            this.logger.info("Defined source " + iReplaySource.getClass());
            if (this.shallConnect && this.selfConnect) {
                this.logger.info("Self-connect from set source");
                connect();
            }
        }
    }

    public void forceAutoconnect() {
        this.selfConnect = false;
    }

    private static int consumeWhitespace(String str, int i) {
        while (i < str.length() && ' ' == str.charAt(i)) {
            i++;
        }
        return i;
    }

    private long newTimestamp(long j) {
        return j + this.offsetInMillis;
    }

    private void updateOffset(long j) {
        this.offsetInMillis = System.currentTimeMillis() - j;
    }

    private String newlineWithDateToNow(String str, boolean z) {
        String str2 = null;
        if (null != this.manipulator) {
            str = this.manipulator.changeInput(str, z);
        }
        int consumeTimestamp = this.timestampParser.consumeTimestamp(str);
        if (consumeTimestamp > 0) {
            try {
                long parseTimestamp = this.timestampParser.parseTimestamp(str.substring(0, consumeTimestamp));
                if (z) {
                    this.offsetInMillis = new DateTime().getMillis() - parseTimestamp;
                } else {
                    this.prevTimeStampNow = parseTimestamp + this.offsetInMillis;
                }
                int consumeWhitespace = consumeWhitespace(str, consumeTimestamp);
                int consumeWhitespace2 = consumeWhitespace(str, consumeWhitespace + 1);
                if (consumeWhitespace < consumeWhitespace2 && consumeWhitespace2 < str.length()) {
                    if (!this.separatorDefined) {
                        this.separator = str.charAt(consumeWhitespace);
                    }
                    String substring = str.substring(consumeWhitespace2);
                    if (null != this.manipulator) {
                        str2 = this.manipulator.composeData(this.prevTimeStampNow, str);
                    } else {
                        str2 = substring;
                    }
                }
            } catch (ParseException e) {
                this.logger.error("Simulator Error : " + e.getMessage());
            }
        }
        return str2;
    }

    public char getSeparator() {
        return this.separator;
    }

    public boolean isEOD() {
        return this.endOfData;
    }

    public void readProfilingData(GenericMultiSourceHandler genericMultiSourceHandler, int i, List<DataQueueDescriptor<?>> list) throws IOException {
        for (int i2 = 0; !isEOD() && i2 < i; i2++) {
            String next = getNext(false);
            if (null != next) {
                char separator = getSeparator();
                String nextId = genericMultiSourceHandler.nextId(next, separator, false);
                DataQueueDescriptor<?> queueDescriptor = getQueueDescriptor(nextId, list);
                queueDescriptor.add(genericMultiSourceHandler.nextTimestamp(next, separator, false), genericMultiSourceHandler.next(nextId, queueDescriptor.getCls(), next, separator, false, true));
            }
        }
    }

    public DataQueueDescriptor<?> getQueueDescriptor(String str, List<DataQueueDescriptor<?>> list) {
        DataQueueDescriptor<?> dataQueueDescriptor = null;
        Iterator<DataQueueDescriptor<?>> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            DataQueueDescriptor<?> next = it.next();
            if (next != null && str.equals(next.getId())) {
                dataQueueDescriptor = next;
                break;
            }
        }
        return dataQueueDescriptor;
    }

    public <T> T getNextItem(Queue<ProfilingQueueItem<T>> queue) throws InterruptedException {
        ProfilingQueueItem<T> poll;
        ProfilingQueueItem<T> poll2;
        T t = null;
        if (!queue.isEmpty() && (poll = queue.poll()) != null) {
            long timestamp = poll.getTimestamp();
            long currentTimeMillis = System.currentTimeMillis();
            if (!this.init) {
                this.init = true;
                this.record = 0L;
                this.start = currentTimeMillis;
                updateOffset(timestamp);
            }
            long newTimestamp = newTimestamp(timestamp);
            if (this.record != newTimestamp) {
                if (currentTimeMillis - this.start <= this.timeInterval) {
                    while (currentTimeMillis - this.start <= this.timeInterval) {
                        Thread.sleep(1L);
                        currentTimeMillis = System.currentTimeMillis();
                    }
                }
                t = poll.getItem();
                this.record = newTimestamp;
                this.start = currentTimeMillis;
            } else if (currentTimeMillis - this.start <= this.timeInterval) {
                t = poll.getItem();
            } else {
                while (newTimestamp == this.record) {
                    if (!queue.isEmpty() && (poll2 = queue.poll()) != null) {
                        t = poll2.getItem();
                        newTimestamp = newTimestamp(poll2.getTimestamp());
                    }
                    currentTimeMillis = System.currentTimeMillis();
                }
                this.start = currentTimeMillis;
                this.record = newTimestamp;
            }
        }
        return t;
    }

    public String getNext(boolean z) throws DefaultModeException {
        String readLine;
        try {
            if (null == this.brForData || (readLine = this.brForData.readLine()) == null) {
                if (this.selfConnect || null != this.brForData) {
                    this.endOfData = true;
                }
                return null;
            }
            if (!z) {
                return readLine;
            }
            if (this.timestampParser.skipParsing(readLine)) {
                return null;
            }
            String newlineWithDateToNow = newlineWithDateToNow(readLine, false);
            try {
                for (DateTime dateTime = new DateTime(); this.prevTimeStampNow > dateTime.getMillis(); dateTime = new DateTime()) {
                    Thread.sleep(1L);
                }
            } catch (InterruptedException e) {
                this.logger.error("Simulator Error : " + e.getMessage());
            }
            monitorMe();
            return newlineWithDateToNow;
        } catch (IOException e2) {
            this.logger.error("Simulator Error : " + e2.getMessage());
            throw new DefaultModeException("Simulator Error : " + e2.getMessage());
        }
    }

    public void setParameterDataFile(String str) {
        if (null == str || str.length() <= 0) {
            return;
        }
        this.logger.info("Received Data file path " + str);
        setSource(new FileSource(str));
    }

    public void setParameterHdfsDataFile(String str) {
        if (null == str || str.length() <= 0) {
            return;
        }
        this.logger.info("Received HDFS Data file path " + str);
        setSource(new HdfsSource(str));
    }

    public void setParameterReplaySpeed(int i) {
    }

    @Override // eu.qualimaster.dataManagement.common.IDataElement
    public void connect() throws DefaultModeException {
        if (null == this.source) {
            this.shallConnect = true;
            this.logger.info("Switching to shall connect " + this.shallConnect);
            return;
        }
        try {
            this.brForData = this.source.open();
            try {
                String readLine = this.brForData.readLine();
                if (readLine != null) {
                    newlineWithDateToNow(readLine, true);
                } else {
                    this.endOfData = true;
                }
                this.logger.info("Connected.");
            } catch (IOException e) {
                this.logger.error("Simulator Error : " + e.getMessage());
                throw new DefaultModeException("Simulator Error : " + e.getMessage());
            }
        } catch (IOException e2) {
            this.logger.error("Simulator Error : " + e2.getMessage());
            throw new DefaultModeException("Simulator Error : " + e2.getMessage());
        }
    }

    @Override // eu.qualimaster.dataManagement.common.IDataElement
    public void disconnect() {
        this.logger.info("Trying to disconnect " + this.brForData);
        if (null != this.brForData) {
            try {
                this.brForData.close();
            } catch (IOException e) {
                this.logger.error("Simulator Error : " + e.getMessage());
            }
        }
        this.brForData = null;
        this.shallConnect = false;
        this.logger.info("Disconnected");
    }

    @Override // eu.qualimaster.dataManagement.common.IDataElement
    public void setStrategy(IStorageStrategyDescriptor iStorageStrategyDescriptor) {
        this.strategy = iStorageStrategyDescriptor;
    }

    @Override // eu.qualimaster.dataManagement.common.IDataElement
    public IStorageStrategyDescriptor getStrategy() {
        return this.strategy;
    }

    @Override // eu.qualimaster.observables.IMeasurable
    public Double getMeasurement(IObservable iObservable) {
        return null;
    }

    private void monitorMe() {
        if (this.monitoringTimestamp == 0) {
            this.monitoringTimestamp = new Date().getTime();
            this.throughput++;
            return;
        }
        long time = new Date().getTime();
        if (time - this.monitoringTimestamp < this.measurementDuration * 1000) {
            this.throughput++;
            return;
        }
        this.logger.info("Pipeline input throughput: " + (this.throughput / this.measurementDuration) + " tuples/sec");
        this.monitoringTimestamp = time;
        this.throughput = 1L;
    }

    @Override // eu.qualimaster.dataManagement.sources.IDataSource
    public IHistoricalDataProvider getHistoricalDataProvider() {
        return null;
    }

    @Override // eu.qualimaster.dataManagement.sources.IDataSource
    public Map<String, String> getIdsNamesMap() {
        return null;
    }

    @Override // eu.qualimaster.dataManagement.sources.IDataSource
    public void setDataSourceListener(IDataSourceListener iDataSourceListener) {
    }
}
