package eu.qualimaster.common.signal;

import eu.qualimaster.Configuration;
import eu.qualimaster.events.EventManager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.storm.curator.framework.CuratorFramework;
import org.apache.storm.curator.framework.CuratorFrameworkFactory;
import org.apache.storm.curator.framework.imps.CuratorFrameworkState;
import org.apache.storm.curator.retry.RetryNTimes;
import org.apache.storm.zookeeper.KeeperException;
import org.apache.storm.zookeeper.data.Stat;

/* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/signal/SignalMechanism.class */
public class SignalMechanism {
    public static final String PATH_SEPARATOR = "/";
    public static final String GLOBAL_NAMESPACE = "qm";
    public static final String PIPELINES_PREFIX = "/pipelines/";
    private static final Map<String, CuratorFramework> FRAMEWORKS = Collections.synchronizedMap(new HashMap());
    private static final Map<String, String> CONNECT_INFO = Collections.synchronizedMap(new HashMap());
    private static final Map<String, Namespace> NAMESPACES = Collections.synchronizedMap(new HashMap());
    private static boolean testMode = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/signal/SignalMechanism$CachedSignal.class */
    public static abstract class CachedSignal {
        private AbstractTopologyExecutorSignal signal;
        private CuratorFramework mechanism;

        protected CachedSignal(AbstractTopologyExecutorSignal abstractTopologyExecutorSignal) {
            this(null, abstractTopologyExecutorSignal);
        }

        protected CachedSignal(CuratorFramework curatorFramework, AbstractTopologyExecutorSignal abstractTopologyExecutorSignal) {
            this.mechanism = curatorFramework;
            this.signal = abstractTopologyExecutorSignal;
        }

        protected AbstractTopologyExecutorSignal getSignal() {
            return this.signal;
        }

        protected CuratorFramework getMechanism() {
            return this.mechanism;
        }

        protected abstract void send() throws SignalException;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/signal/SignalMechanism$Namespace.class */
    public static class Namespace {
        private NamespaceState state;
        private ConcurrentLinkedQueue<CachedSignal> signals;

        private Namespace() {
            this.signals = new ConcurrentLinkedQueue<>();
            this.state = NamespaceState.DISABLE;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public NamespaceState getState() {
            return this.state;
        }

        private String getName() {
            return SignalMechanism.GLOBAL_NAMESPACE;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setState(NamespaceState namespaceState) {
            switch (namespaceState) {
                case ENABLE:
                    synchronized (this.signals) {
                        while (!this.signals.isEmpty()) {
                            send(this.signals.poll());
                        }
                    }
                    break;
                case CLEAR:
                    this.signals.clear();
                    break;
            }
            this.state = namespaceState;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cacheSignal(CachedSignal cachedSignal) throws SignalException {
            switch (this.state) {
                case ENABLE:
                    cachedSignal.send();
                    return;
                case DISABLE:
                    synchronized (this.signals) {
                        this.signals.offer(cachedSignal);
                    }
                    return;
                case CLEAR:
                default:
                    return;
            }
        }

        private void send(CachedSignal cachedSignal) {
            if (null != cachedSignal) {
                try {
                    cachedSignal.send();
                } catch (SignalException e) {
                    SignalMechanism.access$000().error(e.getMessage(), e);
                }
            }
        }

        public String toString() {
            return this.state + " signal count " + this.signals.size();
        }
    }

    /* loaded from: input_file:StormCommons.jar:eu/qualimaster/common/signal/SignalMechanism$NamespaceState.class */
    public enum NamespaceState {
        DISABLE,
        ENABLE,
        CLEAR
    }

    public static void clear() {
        PortManager portManager;
        if (FRAMEWORKS.size() > 0) {
            try {
                if (!testMode && null != (portManager = getPortManager())) {
                    try {
                        portManager.clearAllPortAssignments();
                    } catch (SignalException e) {
                        getLogger().error(e.getMessage());
                    }
                }
            } catch (SignalException e2) {
                getLogger().error(e2.getMessage());
            }
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(NAMESPACES.keySet());
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                changeSignalNamespaceState((String) it.next(), NamespaceState.CLEAR);
            }
            ArrayList<CuratorFramework> arrayList2 = new ArrayList();
            arrayList2.addAll(FRAMEWORKS.values());
            for (CuratorFramework curatorFramework : arrayList2) {
                if (CuratorFrameworkState.STARTED == curatorFramework.getState()) {
                    curatorFramework.close();
                }
            }
        }
        FRAMEWORKS.clear();
        NAMESPACES.clear();
    }

    public static void setConnectString(String str, String str2) {
        CONNECT_INFO.put(str, str2);
        if (null == CONNECT_INFO.get(GLOBAL_NAMESPACE)) {
            CONNECT_INFO.put(GLOBAL_NAMESPACE, str2);
        }
    }

    public static String getConnectString(String str) {
        String str2 = CONNECT_INFO.get(str);
        if (null == str2) {
            str2 = CONNECT_INFO.get(GLOBAL_NAMESPACE);
        }
        if (null == str2) {
            str2 = Configuration.getZookeeperConnectString();
        }
        return str2;
    }

    public static void releaseMechanism(String str) {
        CuratorFramework remove = FRAMEWORKS.remove(str);
        if (null != remove) {
            clearPipeline(remove, str);
            remove.close();
        }
    }

    public static PortManager getPortManager() throws SignalException {
        return new PortManager(obtainFramework(GLOBAL_NAMESPACE));
    }

    public static void prepareMechanism(String str) throws IOException {
        if (Configuration.getPipelineSignalsCurator()) {
            obtainFramework(GLOBAL_NAMESPACE);
        }
    }

    public static CuratorFramework obtainFramework(String str) {
        CuratorFramework curatorFramework = FRAMEWORKS.get(str);
        if (null == curatorFramework) {
            String connectString = getConnectString(str);
            getLogger().info("Creating a curator framework for " + str + " using " + connectString);
            curatorFramework = CuratorFrameworkFactory.builder().namespace(str).connectString(connectString).retryPolicy(new RetryNTimes(Configuration.getZookeeperRetryTimes(), Configuration.getZookeeperRetryInterval())).build();
            getLogger().info("Created a curator framework: " + curatorFramework);
            FRAMEWORKS.put(str, curatorFramework);
            curatorFramework.start();
            getLogger().info("Started the curator framework...");
        }
        return curatorFramework;
    }

    static void sendSignal(CuratorFramework curatorFramework, String str, String str2, String str3) throws SignalException {
        sendSignal(curatorFramework, str, str2, str3.getBytes());
    }

    static void sendSignal(CuratorFramework curatorFramework, String str, String str2, byte[] bArr) throws SignalException {
        try {
            String namespace = curatorFramework.getNamespace();
            String topologyExecutorPath = getTopologyExecutorPath(str, str2);
            Stat stat = (Stat) curatorFramework.checkExists().forPath(topologyExecutorPath);
            if (stat == null) {
                stat = createWithParents(curatorFramework, topologyExecutorPath);
            }
            if (stat == null) {
                throw new Exception("component does not exist " + namespace + ":" + topologyExecutorPath);
            }
            curatorFramework.setData().forPath(topologyExecutorPath, bArr);
            getLogger().info(System.currentTimeMillis() + " sent " + bArr + " to " + namespace + ":" + topologyExecutorPath);
        } catch (Exception e) {
            getLogger().error(e.getMessage(), e);
            throw new SignalException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Stat createWithParents(CuratorFramework curatorFramework, String str) throws Exception {
        Stat stat = (Stat) curatorFramework.checkExists().forPath(str);
        if (stat == null) {
            try {
                curatorFramework.create().creatingParentsIfNeeded().forPath(str);
                getLogger().info("created path " + str);
            } catch (KeeperException e) {
                if (e.code() != KeeperException.Code.NODEEXISTS) {
                    throw e;
                }
            }
            stat = (Stat) curatorFramework.checkExists().forPath(str);
        }
        return stat;
    }

    public static void clearPipeline(String str) {
        clearPipeline(obtainFramework(GLOBAL_NAMESPACE), str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void deleteRecursively(CuratorFramework curatorFramework, String str) throws SignalException {
        try {
            if (curatorFramework.checkExists().forPath(str) != null) {
                List list = (List) curatorFramework.getChildren().forPath(str);
                if (null != list && list.size() > 0) {
                    Iterator it = list.iterator();
                    while (it.hasNext()) {
                        deleteRecursively(curatorFramework, str + "/" + ((String) it.next()));
                    }
                }
                curatorFramework.delete().forPath(str);
            }
        } catch (Exception e) {
            throw new SignalException(e);
        }
    }

    private static void clearPipeline(CuratorFramework curatorFramework, String str) {
        try {
            String str2 = PIPELINES_PREFIX + str;
            if (null != curatorFramework.checkExists().forPath(str2)) {
                deleteRecursively(curatorFramework, str2);
            }
            getPortManager().clearPortAssignments(str);
        } catch (Exception e) {
            getLogger().warn("While clearing pipeline " + str + ": " + e.getClass().getName() + " " + e.getMessage(), e);
        }
    }

    public static String getTopologyExecutorPath(String str, String str2) {
        return PIPELINES_PREFIX + str + "/" + str2;
    }

    public static String getParentPath(String str) {
        int lastIndexOf = str.lastIndexOf("/");
        int length = str.length() - 1;
        if (length == lastIndexOf && length > 0) {
            lastIndexOf = str.lastIndexOf("/", length - 1);
        }
        return lastIndexOf > 0 ? str.substring(0, lastIndexOf) : "/";
    }

    private static Logger getLogger() {
        return LogManager.getLogger(SignalMechanism.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void sendSignal(CuratorFramework curatorFramework, AbstractTopologyExecutorSignal abstractTopologyExecutorSignal) throws SignalException {
        Namespace obtainNamespace = obtainNamespace(abstractTopologyExecutorSignal.getNamespace());
        getLogger().info("Sending the signal: " + abstractTopologyExecutorSignal + ", with the namespace enabled? " + obtainNamespace.getState());
        if (!Configuration.getPipelineSignalsCurator()) {
            if (NamespaceState.DISABLE == obtainNamespace.getState()) {
                obtainNamespace.cacheSignal(new CachedSignal(abstractTopologyExecutorSignal) { // from class: eu.qualimaster.common.signal.SignalMechanism.2
                    @Override // eu.qualimaster.common.signal.SignalMechanism.CachedSignal
                    protected void send() {
                        EventManager.send(getSignal());
                    }
                });
                return;
            } else {
                EventManager.send(abstractTopologyExecutorSignal);
                return;
            }
        }
        if (null == curatorFramework) {
            getLogger().info("Obtaining the framework...");
            curatorFramework = obtainFramework(GLOBAL_NAMESPACE);
        }
        if (NamespaceState.DISABLE == obtainNamespace.getState()) {
            obtainNamespace.cacheSignal(new CachedSignal(curatorFramework, abstractTopologyExecutorSignal) { // from class: eu.qualimaster.common.signal.SignalMechanism.1
                @Override // eu.qualimaster.common.signal.SignalMechanism.CachedSignal
                protected void send() throws SignalException {
                    AbstractTopologyExecutorSignal signal = getSignal();
                    SignalMechanism.sendSignal(getMechanism(), signal.getTopology(), signal.getExecutor(), signal.createPayload());
                }
            });
        } else {
            getLogger().info("Sending the signal: " + abstractTopologyExecutorSignal.toString());
            sendSignal(curatorFramework, abstractTopologyExecutorSignal.getTopology(), abstractTopologyExecutorSignal.getExecutor(), abstractTopologyExecutorSignal.createPayload());
        }
    }

    public static NamespaceState getState(String str) {
        Namespace namespace = NAMESPACES.get(str);
        return null != namespace ? namespace.getState() : NamespaceState.DISABLE;
    }

    private static Namespace obtainNamespace(String str) {
        Namespace namespace;
        if (null == str) {
            str = "";
        }
        synchronized (NAMESPACES) {
            namespace = NAMESPACES.get(str);
            if (null == namespace) {
                namespace = new Namespace();
                NAMESPACES.put(str, namespace);
            }
        }
        return namespace;
    }

    public static void initEnabledSignalNamespaceState(String str) {
        if (null == NAMESPACES.get(str)) {
            changeSignalNamespaceState(str, NamespaceState.ENABLE);
        }
    }

    public static void changeSignalNamespaceState(String str, NamespaceState namespaceState) {
        getLogger().info("Changing namespace state: " + str + " " + namespaceState);
        if (null != str) {
            Namespace namespace = NAMESPACES.get(str);
            switch (namespaceState) {
                case ENABLE:
                    obtainNamespace(str).setState(namespaceState);
                    break;
                case DISABLE:
                    namespace.setState(namespaceState);
                    break;
                case CLEAR:
                    if (null != namespace) {
                        namespace.setState(namespaceState);
                    }
                    NAMESPACES.remove(str);
                    if (!testMode) {
                        clearPipeline(str);
                        break;
                    }
                    break;
            }
            getLogger().info("Changed namespace state: " + str + " " + NAMESPACES.get(str));
        }
    }

    public static void setTestMode(boolean z) {
        testMode = z;
    }

    static /* synthetic */ Logger access$000() {
        return getLogger();
    }
}
