package eu.qualimaster.data.stream.fs;

import eu.qualimaster.stream.simulation.inf.TweetFilesystemReader;
import eu.qualimaster.stream.simulation.inf.TweetListener;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.log4j.Logger;

/* loaded from: input_file:eu/qualimaster/data/stream/fs/HDFSFileTweetSourceReader.class */
public class HDFSFileTweetSourceReader extends TweetFilesystemReader {
    private Path hdfspath;

    public HDFSFileTweetSourceReader(Path path, TweetListener tweetListener, boolean z, double d, boolean z2) {
        super(tweetListener, z, d, z2);
        this.hdfspath = path;
    }

    public HDFSFileTweetSourceReader(String str, TweetListener tweetListener, boolean z, double d, boolean z2) {
        super(tweetListener, z, d, z2);
        this.hdfspath = new Path(str);
    }

    public void setDirectory(String str) {
        this.hdfspath = new Path(str);
    }

    Configuration getHDFSConfig() {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", "hdfs://snf-618466.vm.okeanos.grnet.gr:8020");
        configuration.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        configuration.set("fs.file.impl", LocalFileSystem.class.getName());
        return configuration;
    }

    @Override // eu.qualimaster.stream.simulation.inf.TweetFilesystemReader
    protected ArrayList<String> readSymbols() {
        ArrayList<String> arrayList = new ArrayList<>();
        Logger logger = Logger.getLogger(getClass());
        try {
            FSDataInputStream open = FileSystem.get(getHDFSConfig()).open(new Path(this.hdfspath, "symbols.sym"));
            InputStreamReader inputStreamReader = new InputStreamReader(open);
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
            logger.error("Path " + this.hdfspath + " exists and symbols can be accessed");
            arrayList = super.readSymbolFile(bufferedReader);
            logger.error("Symbols files are read successfully!");
            open.close();
            inputStreamReader.close();
            bufferedReader.close();
        } catch (FileNotFoundException e) {
            logger.error("Can not find the path " + this.hdfspath);
            e.printStackTrace();
        } catch (IOException e2) {
            logger.error("Can not access the path and/or symbols file" + this.hdfspath);
            e2.printStackTrace();
        }
        return arrayList;
    }

    @Override // eu.qualimaster.stream.simulation.inf.TweetFilesystemReader
    public void listenToStream() {
        FileSystem fileSystem;
        RemoteIterator listFiles;
        Logger logger = Logger.getLogger(getClass());
        do {
            try {
                fileSystem = FileSystem.get(getHDFSConfig());
                if (fileSystem.exists(this.hdfspath)) {
                    logger.error("HDFS path exists: " + this.hdfspath);
                } else {
                    logger.error("HDFS path does not exist: " + this.hdfspath);
                }
                logger.error("Try to create iterator");
                listFiles = fileSystem.listFiles(this.hdfspath, true);
                logger.error("HDFS path exists: " + this.hdfspath);
            } catch (IOException e) {
                logger.error("Can not access HDFS files");
                e.printStackTrace();
            }
            if (!isActive()) {
                return;
            }
            logger.error("Iterator created, try to access files");
            boolean z = false;
            while (listFiles.hasNext()) {
                Path path = ((LocatedFileStatus) listFiles.next()).getPath();
                if (!isActive()) {
                    return;
                }
                if (!path.getName().endsWith(".sym")) {
                    if (!z) {
                        logger.error("Files on hdfs can be accessed, OK");
                        logger.error("try to read from HDFS files");
                    }
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(path)));
                    super.readTweetFile(bufferedReader);
                    if (!z) {
                        logger.error("Can access twitter files on HDFS");
                    }
                    z = true;
                    bufferedReader.close();
                }
            }
        } while (getReadLoops());
    }
}
