package eu.qualimaster.algorithms.imp.correlation.softwaresubtopology;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import eu.qualimaster.families.inf.IFCorrelationFinancial;
import eu.qualimaster.families.inf.IFCorrelationTwitter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/qualimaster/algorithms/imp/correlation/softwaresubtopology/MapperBolt.class */
public class MapperBolt extends BaseRichBolt {
    OutputCollector collector;
    List<Integer> taskIds;
    String subTopologyPrefix;
    boolean hasInitialized;
    Map<String, Set<Integer>> streamTaskMapping = new HashMap();
    Map<Integer, List<Pair<String, String>>> taskStreamPairsMapping = new HashMap();
    Logger logger = LoggerFactory.getLogger(MapperBolt.class);

    public MapperBolt(String str) {
        this.subTopologyPrefix = str;
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.taskIds = topologyContext.getComponentTasks(this.subTopologyPrefix + "HayashiYoshidaBolt");
        this.hasInitialized = false;
        System.out.println("taskIds = " + this.taskIds);
    }

    public void execute(Tuple tuple) {
        String symbolId;
        long timestamp;
        double value;
        if ((tuple.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput) || (tuple.getValue(0) instanceof IFCorrelationTwitter.IIFCorrelationTwitterSymbolListInput)) {
            EmitMappingConfiguration(tuple.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput ? ((IFCorrelationFinancial.IIFCorrelationFinancialSymbolListInput) tuple.getValue(0)).getAllSymbols() : ((IFCorrelationTwitter.IIFCorrelationTwitterSymbolListInput) tuple.getValue(0)).getAllSymbols());
        } else if ((tuple.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput) || (tuple.getValue(0) instanceof IFCorrelationTwitter.IIFCorrelationTwitterAnalyzedStreamInput)) {
            if (this.hasInitialized) {
                if (tuple.getValue(0) instanceof IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput) {
                    IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput iIFCorrelationFinancialPreprocessedStreamInput = (IFCorrelationFinancial.IIFCorrelationFinancialPreprocessedStreamInput) tuple.getValue(0);
                    symbolId = iIFCorrelationFinancialPreprocessedStreamInput.getSymbolId();
                    timestamp = iIFCorrelationFinancialPreprocessedStreamInput.getTimestamp();
                    value = iIFCorrelationFinancialPreprocessedStreamInput.getValue();
                } else {
                    IFCorrelationTwitter.IIFCorrelationTwitterAnalyzedStreamInput iIFCorrelationTwitterAnalyzedStreamInput = (IFCorrelationTwitter.IIFCorrelationTwitterAnalyzedStreamInput) tuple.getValue(0);
                    symbolId = iIFCorrelationTwitterAnalyzedStreamInput.getSymbolId();
                    timestamp = iIFCorrelationTwitterAnalyzedStreamInput.getTimestamp();
                    value = iIFCorrelationTwitterAnalyzedStreamInput.getValue();
                }
                ForwardSymbol(symbolId, timestamp, value);
            } else {
                this.logger.error("Not initialized yet!");
            }
        }
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("symbolsStream", true, new Fields(new String[]{"streamId", "timestamp", "value"}));
        outputFieldsDeclarer.declareStream("configurationStream", true, new Fields(new String[]{"streamId0", "streamId1"}));
    }

    private void mapStreamsToTasks() {
        String[] strArr = new String[this.streamTaskMapping.size()];
        this.streamTaskMapping.keySet().toArray(strArr);
        int i = 0;
        int size = this.taskIds.size();
        int length = strArr.length;
        int ceil = (int) Math.ceil(length / size);
        int ceil2 = (int) Math.ceil(length / ceil);
        for (int i2 = 0; i2 < ceil2; i2++) {
            int i3 = 0;
            while (i3 < ceil2 - i2) {
                int i4 = i3 + i2;
                for (int i5 = i3 * ceil; i5 < length && i5 < (i3 + 1) * ceil; i5++) {
                    for (int i6 = i4 == i3 ? i5 + 1 : (i4 * ceil) + 1; i6 < length && i6 < ((i4 + 1) * ceil) + 1; i6++) {
                        this.streamTaskMapping.get(strArr[i5]).add(this.taskIds.get(i));
                        this.streamTaskMapping.get(strArr[i6]).add(this.taskIds.get(i));
                        if (!this.taskStreamPairsMapping.containsKey(this.taskIds.get(i))) {
                            this.taskStreamPairsMapping.put(this.taskIds.get(i), new ArrayList());
                        }
                        this.taskStreamPairsMapping.get(this.taskIds.get(i)).add(new Pair<>(strArr[i5], strArr[i6]));
                    }
                }
                i++;
                if (i == this.taskIds.size()) {
                    i = 0;
                }
                i3++;
            }
        }
    }

    private void ForwardSymbol(String str, long j, double d) {
        Set<Integer> set = this.streamTaskMapping.get(str);
        if (set == null || set.size() == 0) {
            this.logger.error("No target tasks!");
            return;
        }
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            this.collector.emitDirect(it.next().intValue(), "symbolsStream", new Values(new Object[]{str, Long.valueOf(j), Double.valueOf(d)}));
        }
    }

    private void EmitMappingConfiguration(List<String> list) {
        this.streamTaskMapping = new HashMap();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            this.streamTaskMapping.put(it.next(), new HashSet());
        }
        mapStreamsToTasks();
        for (Map.Entry<Integer, List<Pair<String, String>>> entry : this.taskStreamPairsMapping.entrySet()) {
            Integer key = entry.getKey();
            for (Pair<String, String> pair : entry.getValue()) {
                this.collector.emitDirect(key.intValue(), "configurationStream", new Values(new Object[]{(String) pair.getKey(), (String) pair.getValue()}));
            }
            this.logger.info("Task " + key + "will calculate " + entry.getValue().size() + " pairs");
        }
        this.hasInitialized = true;
    }
}
