package eu.qualimaster.data.stream.source;

import eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamData1OutputImpl;
import eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamData2OutputImpl;
import eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamingDataSource;
import eu.qualimaster.data.inf.ITwitterStreamData;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import twitter4j.HashtagEntity;
import twitter4j.Status;
import twitter4j.SymbolEntity;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;

/* loaded from: input_file:eu/qualimaster/data/stream/source/Twitter2FileStreaming.class */
public class Twitter2FileStreaming extends TwitterStreamingDataSource implements Serializable {
    List<String> allSymbolsList;
    private static boolean hdfsFile = true;
    private File fileForList;
    private File fileForData;
    Configuration c;
    FileSystem fs;
    private BufferedReader brForList;
    private BufferedReader brForData;
    private int offsetInSecs;
    private Long diff;
    private DateTime prevTimeStampNow;
    private long count;
    private String path;
    private boolean adjusttimetonow;
    private boolean readLoops;
    private double speedfactor;
    private boolean local;
    Logger logger = Logger.getLogger(Twitter2FileStreaming.class);
    private Path hdfsPathToSymbolList = new Path("/user/storm/resultSymbols/symbols.sym");
    private Path hdfsPathToData = new Path("/user/storm/resultSymbols/1412114400000.txt");
    private SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss");
    private DateTimeFormatter dtf = DateTimeFormat.forPattern("MM/dd/yyyy' 'HH:mm:ss");
    boolean firsttime = true;

    public Twitter2FileStreaming() {
        File file = new File("TweetReplayConfig.properties");
        this.logger.error("try to load properties from the same directory, the pipeline was started: " + file.getAbsoluteFile());
        if (file.exists()) {
            Properties properties = new Properties();
            try {
                this.logger.error("Property file found, read config");
                FileReader fileReader = new FileReader(file);
                properties.load(fileReader);
                fileReader.close();
                String property = properties.getProperty("speedFactor", "1.0");
                String property2 = properties.getProperty("readLoops", "1");
                String property3 = properties.getProperty("runsOnHDFS", "false");
                String property4 = properties.getProperty("dataPath", "D:\\qualimaster\\soccertweets\\");
                try {
                    this.logger.error("Parse property file");
                    boolean parseBoolean = Boolean.parseBoolean(property3);
                    setParameterRealLoops(Integer.parseInt(property2) > 0);
                    setParameterTweetDirectory(property4);
                    setParameterAdjustTimeToNow(true);
                    setParameterRunLocally(!parseBoolean);
                    setParameterSpeedFactor(Double.parseDouble(property));
                    logRunningProperties();
                    return;
                } catch (Exception e) {
                    e.printStackTrace();
                    this.logger.error("Could not parse config");
                }
            } catch (FileNotFoundException e2) {
                e2.printStackTrace();
                this.logger.error("Could not read the config file");
            } catch (IOException e3) {
                e3.printStackTrace();
                setParameterTweetDirectory("/user/storm/resultSymbols");
                setParameterAdjustTimeToNow(true);
                setParameterRealLoops(true);
                setParameterRunLocally(true);
                setParameterSpeedFactor(1.0d);
                this.logger.error("TwitterDataSource is running in  HDFS mode, accessing /user/storm/resultSymbols");
            }
        } else {
            URL resource = TwitterFileStreaming.class.getResource("/conf/TweetReplayConfig.properties");
            File file2 = new File(resource.getFile());
            this.logger.error("try to read from confifuration in the jar " + file2.getAbsolutePath());
            if (file2.exists()) {
                this.logger.error("The configuration is stored in the jar, try to read it.");
                Properties properties2 = new Properties();
                try {
                    properties2.load(resource.openStream());
                    this.logger.error("parse property file");
                    String property5 = properties2.getProperty("speedFactor", "1.0");
                    String property6 = properties2.getProperty("readLoops", "1");
                    String property7 = properties2.getProperty("runsOnHDFS", "false");
                    String property8 = properties2.getProperty("dataPath", "D:\\qualimaster\\soccertweets\\");
                    try {
                        boolean parseBoolean2 = Boolean.parseBoolean(property7);
                        setParameterRealLoops(Integer.parseInt(property6) > 0);
                        setParameterTweetDirectory(property8);
                        setParameterAdjustTimeToNow(true);
                        setParameterRunLocally(!parseBoolean2);
                        setParameterSpeedFactor(Double.parseDouble(property5));
                        logRunningProperties();
                    } catch (Exception e4) {
                        e4.printStackTrace();
                        this.logger.error("Could not parse config");
                    }
                } catch (FileNotFoundException e5) {
                    e5.printStackTrace();
                    this.logger.error("No config files found");
                } catch (IOException e6) {
                    this.logger.error("Cpould not read the config file");
                    e6.printStackTrace();
                }
            }
        }
        this.logger.error("Will use hardcoded properties due to absence of config files");
        setParameterTweetDirectory("/user/storm/resultSymbols");
        setParameterAdjustTimeToNow(true);
        setParameterRealLoops(true);
        setParameterRunLocally(false);
        setParameterSpeedFactor(0.1d);
        logRunningProperties();
        hdfsFile = !this.local;
        this.allSymbolsList = new ArrayList();
        if (hdfsFile) {
            this.c = new Configuration();
            this.c.set("fs.defaultFS", "hdfs://snf-618466.vm.okeanos.grnet.gr:8020");
            this.c.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            this.c.set("fs.file.impl", LocalFileSystem.class.getName());
            try {
                this.fs = FileSystem.get(this.c);
                this.brForList = new BufferedReader(new InputStreamReader(this.fs.open(this.hdfsPathToSymbolList)));
            } catch (IOException e7) {
                this.logger.error("Simulator Error : " + e7.getMessage());
            }
        }
        if (hdfsFile) {
            try {
                this.brForData = new BufferedReader(new InputStreamReader(this.fs.open(this.hdfsPathToData)));
            } catch (IOException e8) {
                this.logger.error("Simulator Error : " + e8.getMessage());
            }
        }
        int i = 0 + 1;
    }

    @Override // eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamingDataSource
    public ITwitterStreamData.ITwitterStreamData1Output getData1() {
        try {
            String readLine = this.brForData.readLine();
            if (readLine == null) {
                return null;
            }
            Status extractStatus = extractStatus(readLine);
            Long valueOf = Long.valueOf(new Date().getTime());
            Long valueOf2 = Long.valueOf(extractStatus.getCreatedAt().getTime());
            if (this.diff == null) {
                this.diff = Long.valueOf(valueOf.longValue() - valueOf2.longValue());
            }
            long longValue = (valueOf2.longValue() + this.diff.longValue()) - valueOf.longValue();
            if (longValue > 0) {
                try {
                    Thread.sleep((long) (longValue * this.speedfactor));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            String changeDate = changeDate(readLine);
            while (this.count < this.prevTimeStampNow.getMillis()) {
                try {
                    Thread.sleep(1L);
                    this.count++;
                } catch (InterruptedException e2) {
                    this.logger.error("Simulator Error : " + e2.getMessage());
                }
            }
            TwitterStreamData1OutputImpl twitterStreamData1OutputImpl = new TwitterStreamData1OutputImpl();
            twitterStreamData1OutputImpl.setStatus(new LabelledTweet(changeDate, readSymbolsFromTweet(extractStatus)));
            return twitterStreamData1OutputImpl;
        } catch (IOException e3) {
            this.logger.error("Simulator Error : " + e3.getMessage());
            return null;
        }
    }

    @Override // eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamingDataSource
    public ITwitterStreamData.ITwitterStreamData2Output getData2() {
        if (!this.firsttime) {
            return null;
        }
        this.firsttime = false;
        ArrayList<String> whichSymbolsToUse = whichSymbolsToUse();
        TwitterStreamData2OutputImpl twitterStreamData2OutputImpl = new TwitterStreamData2OutputImpl();
        twitterStreamData2OutputImpl.setAllSymbols(whichSymbolsToUse);
        return twitterStreamData2OutputImpl;
    }

    public void connect() {
    }

    public void disconnect() {
    }

    public void setParameterAccessToken(String str) {
    }

    public void setParameterAccessTokenSecret(String str) {
    }

    public void setParameterAdjustTimeToNow(boolean z) {
    }

    public void setParameterConsumerKey(String str) {
    }

    public void setParameterConsumerSecret(String str) {
    }

    public void setParameterQueueSize(int i) {
    }

    public void setParameterRealLoops(boolean z) {
        this.readLoops = z;
    }

    public void setParameterRunLocally(boolean z) {
        this.local = z;
    }

    public void setParameterSpeedFactor(double d) {
        this.speedfactor = d;
    }

    public void setParameterTweetDirectory(String str) {
        this.path = str;
    }

    @Override // eu.qualimaster.data.impl.TwitterStreamData.TwitterStreamingDataSource
    protected ArrayList<String> whichSymbolsToUse() {
        return new ArrayList<>(Arrays.asList("AAPL,SPY,FB,QQQ,DIA,BABA,GPRO,AMZN,IBIO,IWN,SIRI,TSLA,TWTR,GOOG,LAKE,NFLX,BAC,BIDU,MSFT,MCD".split(",")));
    }

    private void logRunningProperties() {
        Logger.getLogger(getClass());
        this.logger.error("TwitterDataSource is running in " + (this.local ? "local" : "HDFS") + " mode, accessing " + this.path + ", adjusttimetonow: " + this.adjusttimetonow + ", readLoops: " + this.readLoops + ", speedFactor: " + this.speedfactor);
    }

    public ArrayList<String> readSymbolFile(BufferedReader bufferedReader) throws IOException {
        ArrayList<String> arrayList = new ArrayList<>();
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                return arrayList;
            }
            if (readLine.trim().length() != 0 && !readLine.trim().startsWith("#")) {
                arrayList.add(readLine.trim());
            }
        }
    }

    protected String changeDate(String str) {
        int indexOf = str.indexOf("\"created_at\":\"");
        return str.substring(0, indexOf) + "\"created_at\":\"" + toString(new Date()) + str.substring(str.indexOf("\",\"", indexOf + 1));
    }

    protected Status extractStatus(String str) {
        Status status = null;
        try {
            status = TwitterObjectFactory.createStatus(str);
        } catch (TwitterException e) {
            e.printStackTrace();
        }
        return status;
    }

    public String toString(Date date) {
        return new SimpleDateFormat("EEE MMM dd HH:mm:ss ZZZZZ yyyy", Locale.ENGLISH).format(date);
    }

    ArrayList<String> readSymbolsFromTweet(Status status) {
        HashSet hashSet = new HashSet();
        if (this.possiblesymbols == null) {
            this.possiblesymbols = new HashSet<>();
            this.possiblesymbols.addAll(getData2().getAllSymbols());
        }
        for (SymbolEntity symbolEntity : status.getSymbolEntities()) {
            if (this.possiblesymbols.contains(symbolEntity.getText())) {
                hashSet.add(symbolEntity.getText());
            }
        }
        for (HashtagEntity hashtagEntity : status.getHashtagEntities()) {
            if (this.possiblesymbols.contains(hashtagEntity.getText())) {
                hashSet.add(hashtagEntity.getText());
            }
        }
        return new ArrayList<>(hashSet);
    }
}
