package eu.qualimaster.common.signal;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import eu.qualimaster.dataManagement.common.replay.Tuple;
import eu.qualimaster.dataManagement.sinks.replay.ReplayRecorder;
import eu.qualimaster.dataManagement.sinks.replay.ReplayStreamer;
import eu.qualimaster.dataManagement.strategies.IStorageStrategyDescriptor;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.monitoring.events.ReplayChangedMonitoringEvent;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:eu/qualimaster/common/signal/AbstractReplaySink.class */
public abstract class AbstractReplaySink extends BaseSignalBolt implements IReplayListener {
    private static final long serialVersionUID = 2348634834739948474L;
    private transient Map<Class<?>, TupleHandler<?>> handlers;
    private transient ReplayRunnable replayRunnable;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:eu/qualimaster/common/signal/AbstractReplaySink$ITupleEmitter.class */
    public interface ITupleEmitter<T> {
        void emit(int i, T t);
    }

    /* loaded from: input_file:eu/qualimaster/common/signal/AbstractReplaySink$ReplayRunnable.class */
    private class ReplayRunnable implements Runnable {
        private boolean running;

        private ReplayRunnable() {
            this.running = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.running) {
                if (null != AbstractReplaySink.this.handlers) {
                    Iterator it = AbstractReplaySink.this.handlers.values().iterator();
                    while (it.hasNext()) {
                        ((TupleHandler) it.next()).stream();
                    }
                }
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void end() {
            this.running = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:eu/qualimaster/common/signal/AbstractReplaySink$TupleHandler.class */
    public static class TupleHandler<T> {
        private Class<T> tupleClass;
        private Tuple schema;
        private Map<Integer, ReplayStreamer<T>> streamers;
        private ReplayRecorder<T> recorder;
        private String location;
        private IStorageStrategyDescriptor strategy;
        private AbstractReplaySink sink;
        private ITupleEmitter<T> emitter;

        private TupleHandler(Class<T> cls, Tuple tuple, String str, IStorageStrategyDescriptor iStorageStrategyDescriptor, AbstractReplaySink abstractReplaySink) {
            this.streamers = Collections.synchronizedMap(new HashMap());
            this.tupleClass = cls;
            this.schema = tuple;
            this.location = str;
            this.strategy = iStorageStrategyDescriptor;
            this.sink = abstractReplaySink;
            this.recorder = new ReplayRecorder<>(cls, tuple, str, iStorageStrategyDescriptor);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setEmitter(ITupleEmitter<T> iTupleEmitter) {
            this.emitter = iTupleEmitter;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void store(Object obj) {
            if (null == this.recorder) {
                getLogger().info("recorder is null");
                return;
            }
            try {
                this.recorder.store(this.tupleClass.cast(obj));
            } catch (IOException e) {
                getLogger().error(e.getMessage(), e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareShutdown(ShutdownSignal shutdownSignal) {
            if (null != this.recorder) {
                try {
                    this.recorder.close();
                } catch (IOException e) {
                    getLogger().error(e.getMessage(), e);
                }
                this.recorder = null;
            }
            for (Map.Entry<Integer, ReplayStreamer<T>> entry : this.streamers.entrySet()) {
                close(entry.getValue());
                entry.setValue(null);
            }
            this.streamers.clear();
        }

        private void close(ReplayStreamer<T> replayStreamer) {
            if (null != replayStreamer) {
                try {
                    replayStreamer.close();
                } catch (IOException e) {
                    getLogger().error(e.getMessage(), e);
                }
            }
        }

        private Logger getLogger() {
            return LogManager.getLogger(getClass());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int notifyReplay(ReplaySignal replaySignal) {
            int size;
            synchronized (this.streamers) {
                ReplayStreamer<T> replayStreamer = this.streamers.get(Integer.valueOf(replaySignal.getTicket()));
                if (replaySignal.getStartReplay()) {
                    if (null == replayStreamer) {
                        replayStreamer = new ReplayStreamer<>(this.tupleClass, this.schema, this.location, this.strategy);
                        this.streamers.put(Integer.valueOf(replaySignal.getTicket()), replayStreamer);
                    }
                    replayStreamer.setStart(replaySignal.getStart());
                    replayStreamer.setEnd(replaySignal.getEnd());
                    replayStreamer.setSpeed(replaySignal.getSpeed());
                    replayStreamer.setQuery(replaySignal.getQuery());
                } else {
                    this.streamers.remove(Integer.valueOf(replaySignal.getTicket()));
                    if (null != replayStreamer) {
                        close(replayStreamer);
                    }
                }
                EventManager.send(new ReplayChangedMonitoringEvent(this.sink.getPipeline(), this.sink.getName(), replaySignal.getTicket(), replaySignal.getStartReplay(), replaySignal.getCauseMessageId()));
                size = this.streamers.size();
            }
            return size;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public void stream() {
            synchronized (this.streamers) {
                Iterator<Map.Entry<Integer, ReplayStreamer<T>>> it = this.streamers.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Integer, ReplayStreamer<T>> next = it.next();
                    ReplayStreamer<T> value = next.getValue();
                    if (value.isEOD()) {
                        Object data = next.getValue().getData();
                        if (null != data) {
                            this.emitter.emit(next.getKey().intValue(), data);
                        }
                    } else {
                        it.remove();
                        close(value);
                    }
                }
            }
        }
    }

    protected AbstractReplaySink(String str, String str2, boolean z) {
        super(str, str2, z);
        this.handlers = new HashMap();
    }

    protected <T> void addTupleHandler(Class<T> cls, Tuple tuple, String str, IStorageStrategyDescriptor iStorageStrategyDescriptor, ITupleEmitter<T> iTupleEmitter) {
        TupleHandler<?> tupleHandler = new TupleHandler<>(cls, tuple, str, iStorageStrategyDescriptor, this);
        tupleHandler.setEmitter(iTupleEmitter);
        if (null == this.handlers) {
            this.handlers = new HashMap();
        }
        this.handlers.put(cls, tupleHandler);
    }

    protected abstract void registerHandlers(Map map, TopologyContext topologyContext);

    @Override // eu.qualimaster.common.signal.BaseSignalBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        registerHandlers(map, topologyContext);
    }

    @Override // eu.qualimaster.common.signal.IReplayListener
    public void notifyReplay(ReplaySignal replaySignal) {
        getLogger().info("notifying with:" + replaySignal);
        int i = 0;
        if (null != this.handlers) {
            Iterator<TupleHandler<?>> it = this.handlers.values().iterator();
            while (it.hasNext()) {
                i += it.next().notifyReplay(replaySignal);
            }
        }
        if (null == this.replayRunnable && i > 0) {
            this.replayRunnable = new ReplayRunnable();
            new Thread(this.replayRunnable).start();
        } else {
            if (null == this.replayRunnable || 0 != i) {
                return;
            }
            this.replayRunnable.end();
            this.replayRunnable = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // eu.qualimaster.common.signal.BaseSignalBolt
    public void prepareShutdown(ShutdownSignal shutdownSignal) {
        super.prepareShutdown(shutdownSignal);
        if (null != this.handlers) {
            Iterator<TupleHandler<?>> it = this.handlers.values().iterator();
            while (it.hasNext()) {
                it.next().prepareShutdown(shutdownSignal);
            }
        }
    }

    protected void store(Object obj) {
        if (null == obj || null == this.handlers) {
            return;
        }
        Class<?> cls = obj.getClass();
        TupleHandler<?> checkClass = checkClass(cls);
        if (null != checkClass && null == this.handlers.get(cls)) {
            this.handlers.put(cls, checkClass);
        }
        if (null != checkClass) {
            checkClass.store(obj);
        } else {
            LogManager.getLogger(getClass()).info("no handler for " + cls.getName());
        }
    }

    private TupleHandler<?> checkClass(Class<?> cls) {
        TupleHandler<?> tupleHandler = this.handlers.get(cls);
        if (null == tupleHandler && !cls.isInterface() && null != cls.getSuperclass()) {
            tupleHandler = checkClass(cls.getSuperclass());
        }
        if (null == tupleHandler) {
            Class<?>[] interfaces = cls.getInterfaces();
            for (int i = 0; null == tupleHandler && i < interfaces.length; i++) {
                tupleHandler = checkClass(interfaces[i]);
            }
        }
        return tupleHandler;
    }

    @Override // eu.qualimaster.common.signal.BaseSignalBolt, eu.qualimaster.common.signal.SignalListener
    public void onSignal(byte[] bArr) {
        if (ReplaySignal.notify(bArr, getPipeline(), getName(), this)) {
            return;
        }
        super.onSignal(bArr);
    }
}
