package tests.eu.qualimaster.storm;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import eu.qualimaster.common.signal.AbstractMonitor;
import eu.qualimaster.common.signal.BaseSignalBolt;
import eu.qualimaster.common.signal.ShutdownSignal;
import eu.qualimaster.common.signal.SignalException;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.monitoring.events.AlgorithmChangedMonitoringEvent;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/* loaded from: input_file:tests/eu/qualimaster/storm/SendingBolt.class */
public class SendingBolt extends BaseSignalBolt {
    public static final String STREAM_NAME = "number";
    private boolean sendMonitoringEvents;
    private int port;
    private transient BlockingQueue<Integer> toSend;
    private transient OutputCollector collector;
    private transient Socket spoutSocket;
    private transient ObjectOutputStream out;
    private transient boolean running;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tests/eu/qualimaster/storm/SendingBolt$Sender.class */
    public class Sender implements Runnable {
        private AbstractMonitor monitor;

        private Sender() {
            this.monitor = SendingBolt.this.createThreadMonitor();
        }

        @Override // java.lang.Runnable
        public void run() {
            while (SendingBolt.this.running) {
                if (!SendingBolt.this.toSend.isEmpty()) {
                    try {
                        Integer take = SendingBolt.this.toSend.take();
                        this.monitor.startMonitoring();
                        SendingBolt.this.out.writeInt(take.intValue());
                        SendingBolt.this.out.flush();
                        this.monitor.endMonitoring(take);
                    } catch (IOException e) {
                        System.err.println(SendingBolt.this.getName() + " " + e.getMessage());
                        SendingBolt.this.closeNet();
                    } catch (InterruptedException e2) {
                    }
                }
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e3) {
                }
            }
        }
    }

    public SendingBolt(String str, String str2, boolean z, boolean z2, int i) {
        super(str, str2, z2);
        this.sendMonitoringEvents = z;
        this.port = i;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        super.prepare(map, topologyContext, outputCollector);
        this.collector = outputCollector;
        this.toSend = new LinkedBlockingQueue();
        if (sendChangedEvent()) {
            EventManager.send(new AlgorithmChangedMonitoringEvent(getPipeline(), getName(), getName()));
        }
        connect();
    }

    protected boolean sendChangedEvent() {
        return true;
    }

    private void connect() {
        if (null == this.spoutSocket) {
            try {
                this.spoutSocket = new Socket(InetAddress.getLocalHost(), this.port);
                this.out = new ObjectOutputStream(this.spoutSocket.getOutputStream());
                this.out.flush();
                this.running = true;
                System.err.println(getName() + " server created on " + this.port);
                new Thread(new Sender()).start();
            } catch (IOException e) {
                System.err.println(getName() + " " + e.getMessage());
                closeNet();
            }
        }
    }

    private void closeNet() {
        if (this.running) {
            this.running = false;
            close(this.out);
            close(this.spoutSocket);
        }
    }

    private void close(Closeable closeable) {
        if (null != closeable) {
            try {
                closeable.close();
            } catch (IOException e) {
                System.err.println(getName() + " " + e.getMessage());
            }
        }
    }

    protected void prepareShutdown(ShutdownSignal shutdownSignal) {
        closeNet();
    }

    public void execute(Tuple tuple) {
        startMonitoring();
        connect();
        this.toSend.add(tuple.getInteger(0));
        this.collector.ack(tuple);
        if (this.sendMonitoringEvents) {
            endMonitoring();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{"number"}));
    }

    protected static void send(ShutdownSignal shutdownSignal) {
        try {
            shutdownSignal.sendSignal();
        } catch (SignalException e) {
            System.out.println(e.getMessage());
        }
    }
}
