package bolts;

import Messages.ExternalRequest;
import Messages.ExternalRequestType;
import Messages.ExternalResponse;
import Messages.Query;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import gr.tuc.softnet.ap0n.utils.Interval;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.IOutputCollectorAdapter;
import utils.OutputCollectorAdapter;
import utils.QueryExecutor;
import utils.QueryType;
import utils.Str;

/* loaded from: input_file:bolts/QueryExecutionBolt.class */
public class QueryExecutionBolt extends BaseRichBolt {
    private Logger logger = LoggerFactory.getLogger(QueryExecutionBolt.class);
    private List<Integer> indexBoltTasks;
    private IOutputCollectorAdapter collector;
    private Map<UUID, ExternalRequest> snapshotRequests;
    private Map<UUID, ExternalResponse> snapshotPartialResponses;
    private Map<UUID, QueryExecutor> pathExecutors;
    private int myTaskId;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = new OutputCollectorAdapter(outputCollector);
        this.indexBoltTasks = topologyContext.getComponentTasks(Str.N_INDEX_BOLT);
        this.myTaskId = topologyContext.getThisTaskId();
        this.logger.info("Found " + this.indexBoltTasks.size() + " index parts");
        this.snapshotRequests = new HashMap();
        this.snapshotPartialResponses = new HashMap();
        this.pathExecutors = new HashMap();
    }

    public void execute(Tuple tuple) {
        if (tuple.getSourceStreamId().equals(Str.S_QUERIES)) {
            handleQuery(tuple);
        } else if (tuple.getSourceStreamId().equals(Str.S_EXTERNAL_RESPONSES)) {
            handleResponse(tuple);
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(Str.S_REQUEST_UNICAST, new Fields(new String[]{Str.F_EXTERNAL_REQUEST}));
        outputFieldsDeclarer.declareStream(Str.S_REQUEST_BROADCAST, new Fields(new String[]{Str.F_EXTERNAL_REQUEST}));
    }

    private void handleQuery(Tuple tuple) {
        Query query = (Query) tuple.getValueByField(Str.F_QUERY);
        if (query.getQueryType() == QueryType.SNAPSHOT) {
            handleSnapshotQuery(query.getInterval());
        } else if (query.getQueryType() == QueryType.PATH) {
            handlePathQuery(query);
        } else {
            this.logger.error("Unknown Query received. Ignoring.");
        }
    }

    private void handleSnapshotQuery(Interval interval) {
        UUID randomUUID = UUID.randomUUID();
        ExternalRequest externalRequest = new ExternalRequest(ExternalRequestType.SNAPSHOT, interval, randomUUID, this.myTaskId, this.indexBoltTasks.size());
        this.snapshotRequests.put(randomUUID, externalRequest);
        this.collector.emit(Str.S_REQUEST_BROADCAST, new Values(new Object[]{externalRequest}));
    }

    private void handlePathQuery(Query query) {
        UUID randomUUID = UUID.randomUUID();
        QueryExecutor queryExecutor = new QueryExecutor(randomUUID, this.collector, this.indexBoltTasks, this.myTaskId, query.getInterval(), query.getVertexA(), query.getVertexB(), query.getPathQueryType());
        this.pathExecutors.put(randomUUID, queryExecutor);
        queryExecutor.execute();
    }

    private void handleResponse(Tuple tuple) {
        ExternalResponse externalResponse = (ExternalResponse) tuple.getValueByField(Str.F_EXTERNAL_RESPONSE);
        if (externalResponse.getExternalRequestType() == ExternalRequestType.SNAPSHOT) {
            handleSnapshotResponse(externalResponse);
        } else {
            handlePathRelatedResonse(externalResponse);
        }
    }

    private void handlePathRelatedResonse(ExternalResponse externalResponse) {
        this.logger.info("Got Path related request");
        QueryExecutor queryExecutor = this.pathExecutors.get(externalResponse.getRequestId());
        if (queryExecutor == null) {
            throw new RuntimeException("No executor found for response");
        }
        queryExecutor.processResponse(externalResponse);
    }

    private void handleSnapshotResponse(ExternalResponse externalResponse) {
        this.logger.info("Got partial Response");
        ExternalRequest externalRequest = this.snapshotRequests.get(externalResponse.getRequestId());
        if (externalRequest == null) {
            throw new RuntimeException("Got response for not trucked request!");
        }
        ExternalResponse externalResponse2 = this.snapshotPartialResponses.get(externalRequest.getRequestId());
        if (externalResponse2 == null) {
            externalResponse2 = externalResponse;
            this.snapshotPartialResponses.put(externalRequest.getRequestId(), externalResponse2);
        } else {
            externalResponse2.getSnapshot().mergeSnapshot(externalResponse.getSnapshot());
        }
        externalRequest.decrementExpectedResponsesCount();
        this.logger.info("Waiting for " + externalRequest.getExpectedResponsesCount() + " more responses");
        if (externalRequest.getExpectedResponsesCount() == 0) {
            this.logger.info("Got all partial responses. Logging results.");
            this.logger.info(externalResponse2.getSnapshot().toString());
            this.snapshotPartialResponses.remove(externalRequest.getRequestId());
            this.snapshotRequests.remove(externalRequest.getRequestId());
        }
    }
}
