package backtype.storm.drpc;

import backtype.storm.coordination.CoordinatedBolt;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicBoltExecutor;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.KeyedRoundRobinQueue;
import java.util.HashMap;
import java.util.Map;

/* loaded from: input_file:libs/storm-core-0.9.5.jar:backtype/storm/drpc/KeyedFairBolt.class */
public class KeyedFairBolt implements IRichBolt, CoordinatedBolt.FinishedCallback {
    IRichBolt _delegate;
    KeyedRoundRobinQueue<Tuple> _rrQueue;
    Thread _executor;
    CoordinatedBolt.FinishedCallback _callback;

    public KeyedFairBolt(IRichBolt iRichBolt) {
        this._delegate = iRichBolt;
    }

    public KeyedFairBolt(IBasicBolt iBasicBolt) {
        this(new BasicBoltExecutor(iBasicBolt));
    }

    @Override // backtype.storm.task.IBolt
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        if (this._delegate instanceof CoordinatedBolt.FinishedCallback) {
            this._callback = (CoordinatedBolt.FinishedCallback) this._delegate;
        }
        this._delegate.prepare(map, topologyContext, outputCollector);
        this._rrQueue = new KeyedRoundRobinQueue<>();
        this._executor = new Thread(new Runnable() { // from class: backtype.storm.drpc.KeyedFairBolt.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        KeyedFairBolt.this._delegate.execute(KeyedFairBolt.this._rrQueue.take());
                    } catch (InterruptedException e) {
                        return;
                    }
                }
            }
        });
        this._executor.setDaemon(true);
        this._executor.start();
    }

    @Override // backtype.storm.task.IBolt
    public void execute(Tuple tuple) {
        this._rrQueue.add(tuple.getValue(0), tuple);
    }

    @Override // backtype.storm.task.IBolt
    public void cleanup() {
        this._executor.interrupt();
        this._delegate.cleanup();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        this._delegate.declareOutputFields(outputFieldsDeclarer);
    }

    @Override // backtype.storm.coordination.CoordinatedBolt.FinishedCallback
    public void finishedId(Object obj) {
        if (this._callback != null) {
            this._callback.finishedId(obj);
        }
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        return new HashMap();
    }
}
