package backtype.storm.transactional.partitioned;

import backtype.storm.Config;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
import backtype.storm.transactional.state.RotatingTransactionalState;
import backtype.storm.transactional.state.TransactionalState;
import ch.qos.logback.core.CoreConstants;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.class */
public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpout<Integer> {
    IPartitionedTransactionalSpout _spout;

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor$Coordinator.class */
    class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
        private IPartitionedTransactionalSpout.Coordinator _coordinator;

        public Coordinator(Map map, TopologyContext topologyContext) {
            this._coordinator = PartitionedTransactionalSpoutExecutor.this._spout.getCoordinator(map, topologyContext);
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Coordinator
        public Integer initializeTransaction(BigInteger bigInteger, Integer num) {
            return Integer.valueOf(this._coordinator.numPartitions());
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Coordinator
        public boolean isReady() {
            return this._coordinator.isReady();
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Coordinator
        public void close() {
            this._coordinator.close();
        }
    }

    /* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor$Emitter.class */
    class Emitter implements ITransactionalSpout.Emitter<Integer> {
        private IPartitionedTransactionalSpout.Emitter _emitter;
        private TransactionalState _state;
        private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap();
        private int _index;
        private int _numTasks;

        public Emitter(Map map, TopologyContext topologyContext) {
            this._emitter = PartitionedTransactionalSpoutExecutor.this._spout.getEmitter(map, topologyContext);
            this._state = TransactionalState.newUserState(map, (String) map.get(Config.TOPOLOGY_TRANSACTIONAL_ID), PartitionedTransactionalSpoutExecutor.this.getComponentConfiguration());
            this._index = topologyContext.getThisTaskIndex();
            this._numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Emitter
        public void emitBatch(final TransactionAttempt transactionAttempt, Integer num, final BatchOutputCollector batchOutputCollector) {
            int i = this._index;
            while (true) {
                final int i2 = i;
                if (i2 >= num.intValue()) {
                    return;
                }
                if (!this._partitionStates.containsKey(Integer.valueOf(i2))) {
                    this._partitionStates.put(Integer.valueOf(i2), new RotatingTransactionalState(this._state, CoreConstants.EMPTY_STRING + i2));
                }
                Object stateOrCreate = this._partitionStates.get(Integer.valueOf(i2)).getStateOrCreate(transactionAttempt.getTransactionId(), new RotatingTransactionalState.StateInitializer() { // from class: backtype.storm.transactional.partitioned.PartitionedTransactionalSpoutExecutor.Emitter.1
                    @Override // backtype.storm.transactional.state.RotatingTransactionalState.StateInitializer
                    public Object init(BigInteger bigInteger, Object obj) {
                        return Emitter.this._emitter.emitPartitionBatchNew(transactionAttempt, batchOutputCollector, i2, obj);
                    }
                });
                if (stateOrCreate != null) {
                    this._emitter.emitPartitionBatch(transactionAttempt, batchOutputCollector, i2, stateOrCreate);
                }
                i = i2 + this._numTasks;
            }
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Emitter
        public void cleanupBefore(BigInteger bigInteger) {
            Iterator<RotatingTransactionalState> it = this._partitionStates.values().iterator();
            while (it.hasNext()) {
                it.next().cleanupBefore(bigInteger);
            }
        }

        @Override // backtype.storm.transactional.ITransactionalSpout.Emitter
        public void close() {
            this._state.close();
            this._emitter.close();
        }
    }

    public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout iPartitionedTransactionalSpout) {
        this._spout = iPartitionedTransactionalSpout;
    }

    public IPartitionedTransactionalSpout getPartitionedSpout() {
        return this._spout;
    }

    @Override // backtype.storm.transactional.ITransactionalSpout
    public ITransactionalSpout.Coordinator<Integer> getCoordinator(Map map, TopologyContext topologyContext) {
        return new Coordinator(map, topologyContext);
    }

    @Override // backtype.storm.transactional.ITransactionalSpout
    public ITransactionalSpout.Emitter<Integer> getEmitter(Map map, TopologyContext topologyContext) {
        return new Emitter(map, topologyContext);
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this._spout.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }
}
