package backtype.storm;

import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.Nimbus;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologySummary;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift7.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:backtype/storm/StormSubmitter.class */
public class StormSubmitter {
    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
    public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
    private static Nimbus.Iface localNimbus = null;
    private static String submittedJar = null;

    /* loaded from: input_file:backtype/storm/StormSubmitter$ProgressListener.class */
    public interface ProgressListener {
        void onStart(String str, String str2, long j);

        void onProgress(String str, String str2, long j, long j2);

        void onCompleted(String str, String str2, long j);
    }

    public static void setLocalNimbus(Nimbus.Iface iface) {
        localNimbus = iface;
    }

    public static void submitTopology(String str, Map map, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException {
        submitTopology(str, map, stormTopology, null, null);
    }

    public static void submitTopology(String str, Map map, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException {
        submitTopology(str, map, stormTopology, submitOptions, null);
    }

    /* JADX WARN: Finally extract failed */
    public static void submitTopology(String str, Map map, StormTopology stormTopology, SubmitOptions submitOptions, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException {
        if (!Utils.isValidConf(map)) {
            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
        }
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(Utils.readCommandLineOpts());
        Map readStormConfig = Utils.readStormConfig();
        readStormConfig.putAll(hashMap);
        try {
            String jSONString = JSONValue.toJSONString(hashMap);
            if (localNimbus != null) {
                LOG.info("Submitting topology " + str + " in local mode");
                localNimbus.submitTopology(str, null, jSONString, stormTopology);
            } else {
                NimbusClient configuredClient = NimbusClient.getConfiguredClient(readStormConfig);
                if (topologyNameExists(readStormConfig, str)) {
                    throw new RuntimeException("Topology with name `" + str + "` already exists on cluster");
                }
                submitJar(readStormConfig, progressListener);
                try {
                    try {
                        LOG.info("Submitting topology " + str + " in distributed mode with conf " + jSONString);
                        if (submitOptions != null) {
                            configuredClient.getClient().submitTopologyWithOpts(str, submittedJar, jSONString, stormTopology, submitOptions);
                        } else {
                            configuredClient.getClient().submitTopology(str, submittedJar, jSONString, stormTopology);
                        }
                        configuredClient.close();
                    } catch (Throwable th) {
                        configuredClient.close();
                        throw th;
                    }
                } catch (AlreadyAliveException e) {
                    LOG.warn("Topology already alive exception", (Throwable) e);
                    throw e;
                } catch (InvalidTopologyException e2) {
                    LOG.warn("Topology submission exception: " + e2.get_msg());
                    throw e2;
                }
            }
            LOG.info("Finished submitting topology: " + str);
        } catch (TException e3) {
            throw new RuntimeException(e3);
        }
    }

    public static void submitTopologyWithProgressBar(String str, Map map, StormTopology stormTopology) throws AlreadyAliveException, InvalidTopologyException {
        submitTopologyWithProgressBar(str, map, stormTopology, null);
    }

    public static void submitTopologyWithProgressBar(String str, Map map, StormTopology stormTopology, SubmitOptions submitOptions) throws AlreadyAliveException, InvalidTopologyException {
        submitTopology(str, map, stormTopology, submitOptions, new ProgressListener() { // from class: backtype.storm.StormSubmitter.1
            @Override // backtype.storm.StormSubmitter.ProgressListener
            public void onStart(String str2, String str3, long j) {
                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", str2, str3, Long.valueOf(j));
            }

            @Override // backtype.storm.StormSubmitter.ProgressListener
            public void onProgress(String str2, String str3, long j, long j2) {
                int i = (int) ((50 * j) / j2);
                System.out.printf("\r[%s%s] %d / %d", StringUtils.repeat("=", i), StringUtils.repeat(" ", 50 - i), Long.valueOf(j), Long.valueOf(j2));
            }

            @Override // backtype.storm.StormSubmitter.ProgressListener
            public void onCompleted(String str2, String str3, long j) {
                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", str2, str3, Long.valueOf(j));
            }
        });
    }

    private static boolean topologyNameExists(Map map, String str) {
        NimbusClient configuredClient = NimbusClient.getConfiguredClient(map);
        try {
            try {
                Iterator<TopologySummary> it = configuredClient.getClient().getClusterInfo().get_topologies().iterator();
                while (it.hasNext()) {
                    if (it.next().get_name().equals(str)) {
                        return true;
                    }
                }
                configuredClient.close();
                return false;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            configuredClient.close();
        }
    }

    private static void submitJar(Map map, ProgressListener progressListener) {
        if (submittedJar != null) {
            LOG.info("Jar already uploaded to master. Not submitting jar.");
        } else {
            LOG.info("Jar not uploaded to master yet. Submitting jar...");
            submittedJar = submitJar(map, System.getProperty("storm.jar"), progressListener);
        }
    }

    public static String submitJar(Map map, String str) {
        return submitJar(map, str, null);
    }

    public static String submitJar(Map map, String str, ProgressListener progressListener) {
        if (str == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        NimbusClient configuredClient = NimbusClient.getConfiguredClient(map);
        try {
            try {
                String beginFileUpload = configuredClient.getClient().beginFileUpload();
                LOG.info("Uploading topology jar " + str + " to assigned location: " + beginFileUpload);
                BufferFileInputStream bufferFileInputStream = new BufferFileInputStream(str, THRIFT_CHUNK_SIZE_BYTES);
                long length = new File(str).length();
                if (progressListener != null) {
                    progressListener.onStart(str, beginFileUpload, length);
                }
                long j = 0;
                while (true) {
                    byte[] read = bufferFileInputStream.read();
                    j += read.length;
                    if (progressListener != null) {
                        progressListener.onProgress(str, beginFileUpload, j, length);
                    }
                    if (read.length == 0) {
                        break;
                    }
                    configuredClient.getClient().uploadChunk(beginFileUpload, ByteBuffer.wrap(read));
                }
                configuredClient.getClient().finishFileUpload(beginFileUpload);
                if (progressListener != null) {
                    progressListener.onCompleted(str, beginFileUpload, length);
                }
                LOG.info("Successfully uploaded topology jar to assigned location: " + beginFileUpload);
                configuredClient.close();
                return beginFileUpload;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            configuredClient.close();
            throw th;
        }
    }
}
