package storm.trident.planner;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.jgrapht.DirectedGraph;
import org.jgrapht.graph.DirectedSubgraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import storm.trident.planner.processor.TridentContext;
import storm.trident.topology.BatchInfo;
import storm.trident.topology.ITridentBatchBolt;
import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTupleView;
import storm.trident.util.TridentUtils;

/* loaded from: input_file:storm/trident/planner/SubtopologyBolt.class */
public class SubtopologyBolt implements ITridentBatchBolt {
    DirectedGraph _graph;
    Set<Node> _nodes;
    Map<String, InitialReceiver> _roots = new HashMap();
    Map<Node, TridentTuple.Factory> _outputFactories = new HashMap();
    Map<String, List<TridentProcessor>> _myTopologicallyOrdered = new HashMap();
    Map<Node, String> _batchGroups;

    /* loaded from: input_file:storm/trident/planner/SubtopologyBolt$InitialReceiver.class */
    protected class InitialReceiver {
        List<TridentProcessor> _receivers = new ArrayList();
        TridentTupleView.RootFactory _factory;
        TridentTupleView.ProjectionFactory _project;
        String _stream;

        public InitialReceiver(String str, Fields fields) {
            this._stream = str;
            this._factory = new TridentTupleView.RootFactory(fields);
            ArrayList arrayList = new ArrayList(fields.toList());
            arrayList.remove(0);
            this._project = new TridentTupleView.ProjectionFactory(this._factory, new Fields(arrayList));
        }

        public void receive(ProcessorContext processorContext, Tuple tuple) {
            TridentTuple create = this._project.create(this._factory.create(tuple));
            Iterator<TridentProcessor> it = this._receivers.iterator();
            while (it.hasNext()) {
                it.next().execute(processorContext, this._stream, create);
            }
        }

        public void addReceiver(TridentProcessor tridentProcessor) {
            this._receivers.add(tridentProcessor);
        }

        public TridentTuple.Factory getOutputFactory() {
            return this._project;
        }
    }

    public SubtopologyBolt(DirectedGraph directedGraph, Set<Node> set, Map<Node, String> map) {
        this._nodes = set;
        this._graph = directedGraph;
        this._batchGroups = map;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // storm.trident.topology.ITridentBatchBolt
    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector) {
        int size = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        for (Node node : this._nodes) {
            if (node.stateInfo != null) {
                topologyContext.setTaskData(node.stateInfo.id, node.stateInfo.spec.stateFactory.makeState(map, topologyContext, topologyContext.getThisTaskIndex(), size));
            }
        }
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(new DirectedSubgraph(this._graph, this._nodes, null));
        int i = 0;
        while (topologicalOrderIterator.hasNext()) {
            Node node2 = (Node) topologicalOrderIterator.next();
            if (node2 instanceof ProcessorNode) {
                ProcessorNode processorNode = (ProcessorNode) node2;
                String str = this._batchGroups.get(node2);
                if (!this._myTopologicallyOrdered.containsKey(str)) {
                    this._myTopologicallyOrdered.put(str, new ArrayList());
                }
                this._myTopologicallyOrdered.get(str).add(processorNode.processor);
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (Node node3 : TridentUtils.getParents(this._graph, node2)) {
                    arrayList.add(node3.streamId);
                    if (this._nodes.contains(node3)) {
                        arrayList2.add(this._outputFactories.get(node3));
                    } else {
                        if (!this._roots.containsKey(node3.streamId)) {
                            this._roots.put(node3.streamId, new InitialReceiver(node3.streamId, getSourceOutputFields(topologyContext, node3.streamId)));
                        }
                        this._roots.get(node3.streamId).addReceiver(processorNode.processor);
                        arrayList2.add(this._roots.get(node3.streamId).getOutputFactory());
                    }
                }
                ArrayList arrayList3 = new ArrayList();
                boolean z = false;
                for (Node node4 : TridentUtils.getChildren(this._graph, node2)) {
                    if (this._nodes.contains(node4)) {
                        arrayList3.add(((ProcessorNode) node4).processor);
                    } else {
                        z = true;
                    }
                }
                if (z) {
                    arrayList3.add(new BridgeReceiver(batchOutputCollector));
                }
                processorNode.processor.prepare(map, topologyContext, new TridentContext(processorNode.selfOutFields, arrayList2, arrayList, arrayList3, processorNode.streamId, i, batchOutputCollector));
                this._outputFactories.put(node2, processorNode.processor.getOutputFactory());
            }
            i++;
        }
    }

    private Fields getSourceOutputFields(TopologyContext topologyContext, String str) {
        for (GlobalStreamId globalStreamId : topologyContext.getThisSources().keySet()) {
            if (globalStreamId.get_streamId().equals(str)) {
                return topologyContext.getComponentOutputFields(globalStreamId);
            }
        }
        throw new RuntimeException("Could not find fields for source stream " + str);
    }

    @Override // storm.trident.topology.ITridentBatchBolt
    public void execute(BatchInfo batchInfo, Tuple tuple) {
        InitialReceiver initialReceiver = this._roots.get(tuple.getSourceStreamId());
        if (initialReceiver == null) {
            throw new RuntimeException("Received unexpected tuple " + tuple.toString());
        }
        initialReceiver.receive((ProcessorContext) batchInfo.state, tuple);
    }

    @Override // storm.trident.topology.ITridentBatchBolt
    public void finishBatch(BatchInfo batchInfo) {
        Iterator<TridentProcessor> it = this._myTopologicallyOrdered.get(batchInfo.batchGroup).iterator();
        while (it.hasNext()) {
            it.next().finishBatch((ProcessorContext) batchInfo.state);
        }
    }

    @Override // storm.trident.topology.ITridentBatchBolt
    public Object initBatchState(String str, Object obj) {
        ProcessorContext processorContext = new ProcessorContext(obj, new Object[this._nodes.size()]);
        Iterator<TridentProcessor> it = this._myTopologicallyOrdered.get(str).iterator();
        while (it.hasNext()) {
            it.next().startBatch(processorContext);
        }
        return processorContext;
    }

    @Override // storm.trident.topology.ITridentBatchBolt
    public void cleanup() {
        Iterator<String> it = this._myTopologicallyOrdered.keySet().iterator();
        while (it.hasNext()) {
            Iterator<TridentProcessor> it2 = this._myTopologicallyOrdered.get(it.next()).iterator();
            while (it2.hasNext()) {
                it2.next().cleanup();
            }
        }
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        for (Node node : this._nodes) {
            outputFieldsDeclarer.declareStream(node.streamId, TridentUtils.fieldsConcat(new Fields("$batchId"), node.allOutputFields));
        }
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
