package eu.qualimaster.algorithms.MutualInformationHardware;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import eu.qualimaster.base.algorithm.HardwareTransmitter;
import eu.qualimaster.base.serializer.Parameters;
import eu.qualimaster.common.signal.BaseSignalBolt;
import eu.qualimaster.common.signal.ParameterChange;
import eu.qualimaster.common.signal.ParameterChangeSignal;
import eu.qualimaster.common.signal.ShutdownSignal;
import eu.qualimaster.common.signal.ThreadMonitor;
import eu.qualimaster.common.signal.ValueFormatException;
import eu.qualimaster.common.switching.SynchronizedQueue;
import eu.qualimaster.dataManagement.serialization.SerializerRegistry;
import eu.qualimaster.families.imp.FCorrelationFinancial;
import eu.qualimaster.families.inf.IFCorrelationFinancial;
import eu.qualimaster.observables.ResourceUsage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;

/* loaded from: input_file:eu/qualimaster/algorithms/MutualInformationHardware/HardwareConnectionBolt.class */
public class HardwareConnectionBolt extends BaseSignalBolt {
    static final Logger logger = Logger.getLogger(HardwareConnectionBolt.class);
    private transient OutputCollector collector;
    private String ip;
    private int port;
    private boolean ready;
    private transient Queue<Tuple> queueTuple;
    private transient HardwareTransmitter hardwareConnection;
    private transient ByteArrayOutputStream output;
    private transient ByteArrayOutputStream paraOutput;
    private String separator;
    private transient IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput inputPreprocessedStream;
    private transient IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput inputSymbolList;
    private int size;
    private transient DataAcker dataAcker;
    private transient Queue<Tuple> queue;
    private transient SynchronizedQueue<Tuple> syn;

    /* loaded from: input_file:eu/qualimaster/algorithms/MutualInformationHardware/HardwareConnectionBolt$DataAcker.class */
    public class DataAcker implements Runnable {
        private boolean cont;

        public DataAcker() {
            this.cont = false;
            this.cont = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.cont) {
                HardwareConnectionBolt.this.collector.ack((Tuple) HardwareConnectionBolt.this.syn.consume());
            }
        }

        public void stop() {
            this.cont = false;
        }
    }

    public HardwareConnectionBolt(String str, String str2) {
        super(str, str2, true);
        this.port = 0;
        this.ready = false;
        this.queueTuple = new ConcurrentLinkedQueue();
        this.output = null;
        this.paraOutput = null;
        this.inputPreprocessedStream = null;
        this.inputSymbolList = null;
        this.queue = new ConcurrentLinkedQueue();
        this.syn = null;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.collector = outputCollector;
        this.ip = "147.27.39.12";
        this.ready = false;
        this.queueTuple = new ConcurrentLinkedQueue();
        this.output = new ByteArrayOutputStream();
        this.paraOutput = new ByteArrayOutputStream();
        new FCorrelationFinancial.IFCorrelationFinancialPreprocessedStreamInput();
        new FCorrelationFinancial.IFCorrelationFinancialSymbolListInput();
        this.queue = new ConcurrentLinkedQueue();
        this.size = 50;
        this.syn = new SynchronizedQueue<>(this.queue, this.size);
        try {
            this.dataAcker = new DataAcker();
        } catch (Exception e) {
            logger.error("", e);
        }
        new Thread(this.dataAcker).start();
        initMonitor();
        logger.info("The end of the prepare method.");
    }

    protected boolean initMonitorDuringPrepare() {
        return false;
    }

    public void execute(Tuple tuple) {
        this.queueTuple.offer(tuple);
        if (this.ready) {
            sendTuple();
        }
    }

    protected void sendTuple() {
        ThreadMonitor createThreadMonitor = createThreadMonitor();
        Tuple poll = this.queueTuple.poll();
        if (poll != null) {
            if ((poll.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput) && !(poll.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput)) {
                this.separator = "da";
                this.inputPreprocessedStream = (IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput) poll.getValue(0);
                try {
                    this.output.reset();
                    SerializerRegistry.getSerializerSafe("IFCorrelationFinancialPreprocessedStreamInput", IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput.class).serializeTo(this.inputPreprocessedStream, this.output);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if ((poll.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput) && !(poll.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput)) {
                this.separator = "db";
                this.inputSymbolList = (IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput) poll.getValue(0);
                try {
                    this.output.reset();
                    SerializerRegistry.getSerializerSafe("IFCorrelationFinancialSymbolListInput", IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput.class).serializeTo(this.inputSymbolList, this.output);
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
            }
            this.syn.produce(poll);
            if (!connect() || this.output == null || this.separator == null) {
                return;
            }
            createThreadMonitor.startMonitoring();
            this.hardwareConnection.sendSeparator(this.separator);
            this.hardwareConnection.sendData(this.output.toByteArray());
            createThreadMonitor.endMonitoring(poll);
        }
    }

    private boolean connect() {
        if (null == this.hardwareConnection) {
            try {
                this.hardwareConnection = new HardwareTransmitter(this.ip, this.port);
                logger.info("Created a transmitter connection.");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return null != this.hardwareConnection;
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0031. Please report as an issue. */
    public void notifyParameterChange(ParameterChangeSignal parameterChangeSignal) {
        logger.info("Received the parameter change signal!");
        for (int i = 0; i < parameterChangeSignal.getChangeCount(); i++) {
            try {
                ParameterChange change = parameterChangeSignal.getChange(i);
                this.paraOutput.reset();
                String name = change.getName();
                boolean z = -1;
                switch (name.hashCode()) {
                    case 3414921:
                        if (name.equals("omit")) {
                            z = 2;
                            break;
                        }
                        break;
                    case 3446913:
                        if (name.equals("port")) {
                            z = true;
                            break;
                        }
                        break;
                    case 97532676:
                        if (name.equals("flush")) {
                            z = 3;
                            break;
                        }
                        break;
                    case 1862514705:
                        if (name.equals("windowSize")) {
                            z = false;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        logger.info("Received parameter changing signal windowSize");
                        Parameters.IntegerParameter integerParameter = new Parameters.IntegerParameter();
                        integerParameter.setName("windowSize");
                        integerParameter.setValue(Integer.valueOf(change.getIntValue()));
                        SerializerRegistry.getSerializerSafe("IntegerParameter", Parameters.IntegerParameter.class).serializeTo(integerParameter, this.paraOutput);
                        if (connect() && this.paraOutput != null && "pa" != 0) {
                            this.hardwareConnection.sendSeparator("pa");
                            this.hardwareConnection.sendData(this.paraOutput.toByteArray());
                            break;
                        }
                        break;
                    case true:
                        logger.info("Received signal port!");
                        this.port = change.getIntValue();
                        this.ready = true;
                        logger.info("Received signal port!" + this.port);
                        recordOnce(ResourceUsage.USED_CPUS, 1.0d);
                        recordOnce(ResourceUsage.USED_DFES, 1.0d);
                        break;
                    case true:
                        logger.info("Received signal omit, sending the omit message to hardware!!");
                        if (connect()) {
                            this.hardwareConnection.sendSeparator("cc");
                            break;
                        } else {
                            break;
                        }
                    case true:
                        logger.info("Received signal flush, sending the flush message to hardware!!");
                        if (connect()) {
                            this.hardwareConnection.sendFlushMessage();
                            break;
                        } else {
                            break;
                        }
                }
            } catch (IOException e) {
                e.printStackTrace();
                return;
            } catch (ValueFormatException e2) {
                e2.printStackTrace();
                return;
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    protected void prepareShutdown(ShutdownSignal shutdownSignal) {
        super.prepareShutdown(shutdownSignal);
        this.dataAcker.stop();
        try {
            if (this.hardwareConnection != null) {
                this.hardwareConnection.sendCloseMessage();
                this.hardwareConnection.close();
            }
            if (this.output != null) {
                this.output.close();
            }
            if (this.paraOutput != null) {
                this.paraOutput.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        recordOnce(ResourceUsage.USED_CPUS, 0.0d);
        recordOnce(ResourceUsage.USED_DFES, 0.0d);
    }

    public void cleanup() {
        super.cleanup();
        try {
            if (this.hardwareConnection != null) {
                this.hardwareConnection.sendCloseMessage();
                this.hardwareConnection.close();
            }
            if (this.output != null) {
                this.output.close();
            }
            if (this.paraOutput != null) {
                this.paraOutput.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
