package eu.qualimaster.algorithms.stream.sentiment;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import eu.qualimaster.data.stream.source.LabelledTweet;
import eu.qualimaster.families.imp.FSentimentAnalysis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import org.apache.log4j.Logger;
import twitter4j.Status;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;

/* loaded from: input_file:eu/qualimaster/algorithms/stream/sentiment/TimeseriesBolt.class */
public class TimeseriesBolt extends BaseRichBolt {
    private OutputCollector collector;
    private int windowSizeInSeconds;
    private int sentimentClass;
    private LabelledTweet tweetContainer;
    private String annotation;
    private static final long serialVersionUID = 4896669174997566924L;
    private Thread pulser;
    private String streamId;
    private String title;
    Hashtable<String, Stack<AnnotatedTweet>> windowlist = new Hashtable<>();
    private Date lastemmisiondate = null;
    boolean first_run = true;
    ArrayList<String> symbols = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:eu/qualimaster/algorithms/stream/sentiment/TimeseriesBolt$AnnotatedTweet.class */
    public class AnnotatedTweet {
        private Status tweet;
        private String annotation;
        private Integer classifiervalue;

        public AnnotatedTweet(Status status, String str, Integer num) {
            this.tweet = status;
            this.annotation = str;
            this.classifiervalue = num;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:eu/qualimaster/algorithms/stream/sentiment/TimeseriesBolt$TweetIntoFrameAggregator.class */
    public class TweetIntoFrameAggregator {
        long duration;
        private int sum;
        private List<String> tweetTexts = new ArrayList();
        private List<String> annotations = new ArrayList();
        Date frametime = new Date();
        private List<Status> tweets = new ArrayList();
        private List<Integer> sentiments = new ArrayList();

        public TweetIntoFrameAggregator() {
        }

        public Date getStartdate() {
            return new Date();
        }

        public List<Status> getTweets() {
            return this.tweets;
        }

        public List<Integer> getSentiments() {
            return this.sentiments;
        }

        public double computeValue() {
            return (1.0d * this.sum) / this.tweetTexts.size();
        }

        public int computeVolume() {
            return this.tweetTexts.size();
        }

        public List<String> getTweetTexts() {
            return this.tweetTexts;
        }

        public List<String> getAnnotations() {
            return this.annotations;
        }

        public void add(Status status, Integer num, String str) {
            this.tweets.add(status);
            this.tweetTexts.add(status.getText());
            this.annotations.add(str);
            this.sentiments.add(num);
            if (num.intValue() == TimeseriesBolt.this.sentimentClass) {
                this.sum++;
            }
        }
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.windowSizeInSeconds = 3600;
    }

    public void execute(Tuple tuple) {
        if (this.pulser == null) {
            this.pulser = new Thread(new Runnable() { // from class: eu.qualimaster.algorithms.stream.sentiment.TimeseriesBolt.1
                private boolean stoppulse = false;

                @Override // java.lang.Runnable
                public void run() {
                    while (!this.stoppulse) {
                        try {
                            TimeseriesBolt.this.emitWindows();
                            Thread.sleep(1000L);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            this.pulser.start();
        }
        checkEmmitTuple(tuple);
    }

    private synchronized void checkEmmitTuple(Tuple tuple) {
        new Date();
        this.tweetContainer = (LabelledTweet) tuple.getValueByField("tweet");
        try {
            if (this.tweetContainer != null && this.tweetContainer.getTweet() != null) {
                Status createStatus = TwitterObjectFactory.createStatus(this.tweetContainer.getTweet());
                this.annotation = (String) tuple.getValueByField("annotation");
                for (String str : this.tweetContainer.getSymbols()) {
                    Stack<AnnotatedTweet> stack = this.windowlist.get(str);
                    if (stack == null) {
                        Hashtable<String, Stack<AnnotatedTweet>> hashtable = this.windowlist;
                        Stack<AnnotatedTweet> stack2 = new Stack<>();
                        stack = stack2;
                        hashtable.put(str, stack2);
                    }
                    stack.add(new AnnotatedTweet(createStatus, this.annotation, tuple.getIntegerByField("sentimentClass")));
                }
                Logger.getLogger(getClass()).debug("Tweet received " + createStatus.getId());
            }
            emitWindows();
            this.collector.ack(tuple);
        } catch (TwitterException e) {
            e.printStackTrace();
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(getStreamId(), new Fields(new String[]{"sentimentTimeseries", "tweets", "annotations"}));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void emitWindows() {
        Date date = new Date();
        try {
            if (this.lastemmisiondate == null) {
                this.lastemmisiondate = new Date();
            } else if (date.getTime() - this.lastemmisiondate.getTime() > this.windowSizeInSeconds) {
                ArrayList arrayList = new ArrayList();
                for (String str : this.windowlist.keySet()) {
                    TweetIntoFrameAggregator tweetIntoFrameAggregator = new TweetIntoFrameAggregator();
                    Iterator<AnnotatedTweet> it = this.windowlist.get(str).iterator();
                    while (it.hasNext()) {
                        AnnotatedTweet next = it.next();
                        tweetIntoFrameAggregator.add(next.tweet, next.classifiervalue, next.annotation);
                    }
                    if (tweetIntoFrameAggregator.getTweetTexts().size() > 0) {
                        tweetIntoFrameAggregator.frametime = null;
                        this.lastemmisiondate = new Date();
                        tweetIntoFrameAggregator.frametime = null;
                        emit(str, tweetIntoFrameAggregator);
                        Collections.reverse(arrayList);
                        this.windowlist.get(str).clear();
                        this.windowlist.get(str).addAll(arrayList);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public Map<String, Object> getComponentConfiguration() {
        return super.getComponentConfiguration();
    }

    public void cleanup() {
        super.cleanup();
    }

    public void setGranularity(int i) {
    }

    public int getSentimentClass() {
        return this.sentimentClass;
    }

    public void setSentimentClass(int i) {
        this.sentimentClass = i;
    }

    public synchronized void emit(String str, TweetIntoFrameAggregator tweetIntoFrameAggregator) {
        Logger.getLogger(getClass()).debug("Timeseries is about to leave the bolt ");
        this.lastemmisiondate = new Date();
        FSentimentAnalysis.IFSentimentAnalysisAnalyzedStreamOutput iFSentimentAnalysisAnalyzedStreamOutput = new FSentimentAnalysis.IFSentimentAnalysisAnalyzedStreamOutput();
        for (int i = 0; i < tweetIntoFrameAggregator.getTweets().size(); i++) {
            iFSentimentAnalysisAnalyzedStreamOutput.setSymbolId(str);
            iFSentimentAnalysisAnalyzedStreamOutput.setValue(tweetIntoFrameAggregator.getSentiments().get(i).intValue());
            iFSentimentAnalysisAnalyzedStreamOutput.setTimestamp(tweetIntoFrameAggregator.getTweets().get(i).getCreatedAt().getTime());
            this.collector.emit(getStreamId(), new Values(new Object[]{iFSentimentAnalysisAnalyzedStreamOutput, tweetIntoFrameAggregator.getTweetTexts(), tweetIntoFrameAggregator.getAnnotations()}));
        }
        new TweetIntoFrameAggregator();
    }

    public void setStreamId(String str) {
        this.streamId = str;
    }

    public String getStreamId() {
        return this.streamId;
    }

    public void setTitle(String str) {
        this.title = str;
    }
}
