package backtype.storm.coordination;

import backtype.storm.Constants;
import backtype.storm.drpc.PrepareRequest;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.FailedException;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.TimeCacheMap;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt.class */
public class CoordinatedBolt implements IRichBolt {
    public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
    private Map<String, SourceArgs> _sourceArgs;
    private IdStreamSpec _idStreamSpec;
    private IRichBolt _delegate;
    private Integer _numSourceReports;
    private List<Integer> _countOutTasks;
    private OutputCollector _collector;
    private TimeCacheMap<Object, TrackingInfo> _tracked;

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$CoordinatedOutputCollector.class */
    public class CoordinatedOutputCollector implements IOutputCollector {
        IOutputCollector _delegate;

        public CoordinatedOutputCollector(IOutputCollector iOutputCollector) {
            this._delegate = iOutputCollector;
        }

        @Override // backtype.storm.task.IOutputCollector
        public List<Integer> emit(String str, Collection<Tuple> collection, List<Object> list) {
            List<Integer> emit = this._delegate.emit(str, collection, list);
            updateTaskCounts(list.get(0), emit);
            return emit;
        }

        @Override // backtype.storm.task.IOutputCollector
        public void emitDirect(int i, String str, Collection<Tuple> collection, List<Object> list) {
            updateTaskCounts(list.get(0), Arrays.asList(Integer.valueOf(i)));
            this._delegate.emitDirect(i, str, collection, list);
        }

        @Override // backtype.storm.task.IOutputCollector
        public void ack(Tuple tuple) {
            Object value = tuple.getValue(0);
            synchronized (CoordinatedBolt.this._tracked) {
                TrackingInfo trackingInfo = (TrackingInfo) CoordinatedBolt.this._tracked.get(value);
                if (trackingInfo != null) {
                    trackingInfo.receivedTuples++;
                }
            }
            if (CoordinatedBolt.this.checkFinishId(tuple, TupleType.REGULAR)) {
                this._delegate.fail(tuple);
            } else {
                this._delegate.ack(tuple);
            }
        }

        @Override // backtype.storm.task.IOutputCollector
        public void fail(Tuple tuple) {
            Object value = tuple.getValue(0);
            synchronized (CoordinatedBolt.this._tracked) {
                TrackingInfo trackingInfo = (TrackingInfo) CoordinatedBolt.this._tracked.get(value);
                if (trackingInfo != null) {
                    trackingInfo.failed = true;
                }
            }
            CoordinatedBolt.this.checkFinishId(tuple, TupleType.REGULAR);
            this._delegate.fail(tuple);
        }

        @Override // backtype.storm.task.IErrorReporter
        public void reportError(Throwable th) {
            this._delegate.reportError(th);
        }

        private void updateTaskCounts(Object obj, List<Integer> list) {
            synchronized (CoordinatedBolt.this._tracked) {
                TrackingInfo trackingInfo = (TrackingInfo) CoordinatedBolt.this._tracked.get(obj);
                if (trackingInfo != null) {
                    Map<Integer, Integer> map = trackingInfo.taskEmittedTuples;
                    for (Integer num : list) {
                        map.put(num, Integer.valueOf(((Integer) Utils.get(map, num, 0)).intValue() + 1));
                    }
                }
            }
        }
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$FinishedCallback.class */
    public interface FinishedCallback {
        void finishedId(Object obj);
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$IdStreamSpec.class */
    public static class IdStreamSpec implements Serializable {
        GlobalStreamId _id;

        public GlobalStreamId getGlobalStreamId() {
            return this._id;
        }

        public static IdStreamSpec makeDetectSpec(String str, String str2) {
            return new IdStreamSpec(str, str2);
        }

        protected IdStreamSpec(String str, String str2) {
            this._id = new GlobalStreamId(str, str2);
        }
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$SourceArgs.class */
    public static class SourceArgs implements Serializable {
        public boolean singleCount;

        protected SourceArgs(boolean z) {
            this.singleCount = z;
        }

        public static SourceArgs single() {
            return new SourceArgs(true);
        }

        public static SourceArgs all() {
            return new SourceArgs(false);
        }

        public String toString() {
            return "<Single: " + this.singleCount + ">";
        }
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$TimeoutCallback.class */
    public interface TimeoutCallback {
        void timeoutId(Object obj);
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$TimeoutItems.class */
    private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
        private TimeoutItems() {
        }

        @Override // backtype.storm.utils.TimeCacheMap.ExpiredCallback
        public void expire(Object obj, TrackingInfo trackingInfo) {
            synchronized (CoordinatedBolt.this._tracked) {
                trackingInfo.failed = true;
                if (!trackingInfo.finished) {
                    ((TimeoutCallback) CoordinatedBolt.this._delegate).timeoutId(obj);
                }
            }
        }
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$TrackingInfo.class */
    public static class TrackingInfo {
        int reportCount = 0;
        int expectedTupleCount = 0;
        int receivedTuples = 0;
        boolean failed = false;
        Map<Integer, Integer> taskEmittedTuples = new HashMap();
        boolean receivedId = false;
        boolean finished = false;
        List<Tuple> ackTuples = new ArrayList();

        public String toString() {
            return "reportCount: " + this.reportCount + "\nexpectedTupleCount: " + this.expectedTupleCount + "\nreceivedTuples: " + this.receivedTuples + "\nfailed: " + this.failed + "\n" + this.taskEmittedTuples.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/coordination/CoordinatedBolt$TupleType.class */
    public enum TupleType {
        REGULAR,
        ID,
        COORD
    }

    public CoordinatedBolt(IRichBolt iRichBolt) {
        this(iRichBolt, null, null);
    }

    public CoordinatedBolt(IRichBolt iRichBolt, String str, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
        this(iRichBolt, singleSourceArgs(str, sourceArgs), idStreamSpec);
    }

    public CoordinatedBolt(IRichBolt iRichBolt, Map<String, SourceArgs> map, IdStreamSpec idStreamSpec) {
        this._countOutTasks = new ArrayList();
        this._sourceArgs = map;
        if (this._sourceArgs == null) {
            this._sourceArgs = new HashMap();
        }
        this._delegate = iRichBolt;
        this._idStreamSpec = idStreamSpec;
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this._tracked = new TimeCacheMap<>(topologyContext.maxTopologyMessageTimeout(), this._delegate instanceof TimeoutCallback ? new TimeoutItems() : null);
        this._collector = outputCollector;
        this._delegate.prepare(map, topologyContext, new OutputCollector(new CoordinatedOutputCollector(outputCollector)));
        Iterator it = ((Map) Utils.get(topologyContext.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap())).keySet().iterator();
        while (it.hasNext()) {
            Iterator<Integer> it2 = topologyContext.getComponentTasks((String) it.next()).iterator();
            while (it2.hasNext()) {
                this._countOutTasks.add(it2.next());
            }
        }
        if (this._sourceArgs.isEmpty()) {
            return;
        }
        this._numSourceReports = 0;
        for (Map.Entry<String, SourceArgs> entry : this._sourceArgs.entrySet()) {
            if (entry.getValue().singleCount) {
                this._numSourceReports = Integer.valueOf(this._numSourceReports.intValue() + 1);
            } else {
                this._numSourceReports = Integer.valueOf(this._numSourceReports.intValue() + topologyContext.getComponentTasks(entry.getKey()).size());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean checkFinishId(Tuple tuple, TupleType tupleType) {
        Object value = tuple.getValue(0);
        boolean z = false;
        synchronized (this._tracked) {
            TrackingInfo trackingInfo = this._tracked.get(value);
            try {
                if (trackingInfo != null) {
                    boolean z2 = false;
                    if ((this._idStreamSpec == null && tupleType == TupleType.COORD) || (this._idStreamSpec != null && tupleType == TupleType.ID)) {
                        trackingInfo.ackTuples.add(tuple);
                        z2 = true;
                    }
                    if (trackingInfo.failed) {
                        z = true;
                        Iterator<Tuple> it = trackingInfo.ackTuples.iterator();
                        while (it.hasNext()) {
                            this._collector.fail(it.next());
                        }
                        this._tracked.remove(value);
                    } else if (trackingInfo.receivedId && (this._sourceArgs.isEmpty() || (trackingInfo.reportCount == this._numSourceReports.intValue() && trackingInfo.expectedTupleCount == trackingInfo.receivedTuples))) {
                        if (this._delegate instanceof FinishedCallback) {
                            ((FinishedCallback) this._delegate).finishedId(value);
                        }
                        if (!this._sourceArgs.isEmpty() && tupleType == TupleType.REGULAR) {
                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
                        }
                        Iterator<Integer> it2 = this._countOutTasks.iterator();
                        while (it2.hasNext()) {
                            int intValue = it2.next().intValue();
                            this._collector.emitDirect(intValue, Constants.COORDINATED_STREAM_ID, tuple, new Values(value, Integer.valueOf(((Integer) Utils.get(trackingInfo.taskEmittedTuples, Integer.valueOf(intValue), 0)).intValue())));
                        }
                        Iterator<Tuple> it3 = trackingInfo.ackTuples.iterator();
                        while (it3.hasNext()) {
                            this._collector.ack(it3.next());
                        }
                        trackingInfo.finished = true;
                        this._tracked.remove(value);
                    }
                    if (!z2 && tupleType != TupleType.REGULAR) {
                        if (trackingInfo.failed) {
                            this._collector.fail(tuple);
                        } else {
                            this._collector.ack(tuple);
                        }
                    }
                } else if (tupleType != TupleType.REGULAR) {
                    this._collector.fail(tuple);
                }
            } catch (FailedException e) {
                LOG.error("Failed to finish batch", (Throwable) e);
                Iterator<Tuple> it4 = trackingInfo.ackTuples.iterator();
                while (it4.hasNext()) {
                    this._collector.fail(it4.next());
                }
                this._tracked.remove(value);
                z = true;
            }
        }
        return z;
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        TrackingInfo trackingInfo;
        Object value = tuple.getValue(0);
        TupleType tupleType = getTupleType(tuple);
        synchronized (this._tracked) {
            trackingInfo = this._tracked.get(value);
            if (trackingInfo == null) {
                trackingInfo = new TrackingInfo();
                if (this._idStreamSpec == null) {
                    trackingInfo.receivedId = true;
                }
                this._tracked.put(value, trackingInfo);
            }
        }
        if (tupleType == TupleType.ID) {
            synchronized (this._tracked) {
                trackingInfo.receivedId = true;
            }
            checkFinishId(tuple, tupleType);
            return;
        }
        if (tupleType != TupleType.COORD) {
            synchronized (this._tracked) {
                this._delegate.execute(tuple);
            }
            return;
        }
        int intValue = ((Integer) tuple.getValue(1)).intValue();
        synchronized (this._tracked) {
            trackingInfo.reportCount++;
            trackingInfo.expectedTupleCount += intValue;
        }
        checkFinishId(tuple, tupleType);
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this._delegate.cleanup();
        this._tracked.cleanup();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this._delegate.declareOutputFields(outputFieldsDeclarer);
        outputFieldsDeclarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields(PrepareRequest.ID_STREAM, "count"));
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this._delegate.getComponentConfiguration();
    }

    private static Map<String, SourceArgs> singleSourceArgs(String str, SourceArgs sourceArgs) {
        HashMap hashMap = new HashMap();
        hashMap.put(str, sourceArgs);
        return hashMap;
    }

    private TupleType getTupleType(Tuple tuple) {
        return (this._idStreamSpec == null || !tuple.getSourceGlobalStreamid().equals(this._idStreamSpec._id)) ? (this._sourceArgs.isEmpty() || !tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) ? TupleType.REGULAR : TupleType.COORD : TupleType.ID;
    }
}
