package tests.eu.qualimaster.adaptation;

import backtype.storm.ILocalCluster;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import eu.qualimaster.adaptation.external.AlgorithmChangedMessage;
import eu.qualimaster.adaptation.external.ClientEndpoint;
import eu.qualimaster.adaptation.external.DisconnectRequest;
import eu.qualimaster.adaptation.external.HardwareAliveMessage;
import eu.qualimaster.adaptation.external.IDispatcher;
import eu.qualimaster.adaptation.external.LoggingFilterRequest;
import eu.qualimaster.adaptation.external.LoggingMessage;
import eu.qualimaster.adaptation.external.MonitoringDataMessage;
import eu.qualimaster.adaptation.external.PipelineMessage;
import eu.qualimaster.adaptation.external.SwitchAlgorithmRequest;
import eu.qualimaster.common.logging.QmLogging;
import eu.qualimaster.common.signal.ThriftConnection;
import eu.qualimaster.coordination.CoordinationManager;
import eu.qualimaster.coordination.StormUtils;
import eu.qualimaster.coordination.commands.AlgorithmChangeCommand;
import eu.qualimaster.coordination.commands.ParameterChangeCommand;
import eu.qualimaster.coordination.commands.PipelineCommand;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import eu.qualimaster.monitoring.MonitoringManager;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import tests.eu.qualimaster.coordination.AbstractCoordinationTests;
import tests.eu.qualimaster.coordination.TestNameMapping;
import tests.eu.qualimaster.coordination.Utils;
import tests.eu.qualimaster.storm.Naming;
import tests.eu.qualimaster.storm.Topology;

/* loaded from: input_file:tests/eu/qualimaster/adaptation/TopLevelStormTest.class */
public class TopLevelStormTest extends AbstractAdaptationTests {
    private ClientEndpoint endpoint;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tests/eu/qualimaster/adaptation/TopLevelStormTest$TestDispatcher.class */
    public class TestDispatcher implements IDispatcher {
        private AtomicInteger loggingMsgCount;
        private List<AlgorithmChangedMessage> algMsg;
        private List<MonitoringDataMessage> monMsg;

        private TestDispatcher() {
            this.loggingMsgCount = new AtomicInteger(0);
            this.algMsg = Collections.synchronizedList(new ArrayList());
            this.monMsg = Collections.synchronizedList(new ArrayList());
        }

        public void handleDisconnectRequest(DisconnectRequest disconnectRequest) {
        }

        public void handleSwitchAlgorithmRequest(SwitchAlgorithmRequest switchAlgorithmRequest) {
        }

        public void handleMonitoringDataMessage(MonitoringDataMessage monitoringDataMessage) {
            this.monMsg.add(monitoringDataMessage);
        }

        public void handleAlgorithmChangedMessage(AlgorithmChangedMessage algorithmChangedMessage) {
            this.algMsg.add(algorithmChangedMessage);
        }

        public int getMonitoringDataCount(String str) {
            int i = 0;
            if (null == str) {
                i = this.monMsg.size();
            } else {
                for (int i2 = 0; i2 < this.monMsg.size(); i2++) {
                    if (equals(str, this.monMsg.get(i2).getPart())) {
                        i++;
                    }
                }
            }
            return i;
        }

        public int getAlgorithmChangedMessageCount() {
            return this.algMsg.size();
        }

        public int getLoggingMessageCount() {
            return this.loggingMsgCount.get();
        }

        public int getAlgorithmChangedMessageCount(String str, String str2, String str3) {
            int i = 0;
            for (int i2 = 0; i2 < this.algMsg.size(); i2++) {
                AlgorithmChangedMessage algorithmChangedMessage = this.algMsg.get(i2);
                if (equals(algorithmChangedMessage.getPipeline(), str) && equals(algorithmChangedMessage.getPipelineElement(), str2) && equals(algorithmChangedMessage.getAlgorithm(), str3)) {
                    i++;
                }
            }
            return i;
        }

        private boolean equals(String str, String str2) {
            return str == null ? str2 == null : str.equals(str2);
        }

        public void handleHardwareAliveMessage(HardwareAliveMessage hardwareAliveMessage) {
        }

        public void handlePipelineMessage(PipelineMessage pipelineMessage) {
        }

        public void handleLoggingMessage(LoggingMessage loggingMessage) {
            this.loggingMsgCount.incrementAndGet();
        }

        public void handleLoggingFilterRequest(LoggingFilterRequest loggingFilterRequest) {
        }
    }

    @Override // tests.eu.qualimaster.adaptation.AbstractAdaptationTests
    @Before
    public void setUp() {
        Utils.setModelProvider(Utils.INFRASTRUCTURE_TEST_MODEL_PROVIDER);
        Utils.configure();
        super.setUp();
        CoordinationManager.registerTestMapping(TestNameMapping.INSTANCE);
    }

    @Override // tests.eu.qualimaster.adaptation.AbstractAdaptationTests
    @After
    public void tearDown() {
        stopEndpoint();
        super.tearDown();
    }

    private void stopEndpoint() {
        if (null != this.endpoint) {
            this.endpoint.stop();
            this.endpoint = null;
        }
    }

    @Test
    public void testStack() throws IOException {
        if (AbstractCoordinationTests.isJenkins()) {
            return;
        }
        testStackImpl();
    }

    public void testStackImpl() throws IOException {
        int demoMessageState = MonitoringManager.setDemoMessageState(15);
        TestDispatcher testDispatcher = new TestDispatcher();
        this.endpoint = new ClientEndpoint(testDispatcher, InetAddress.getLocalHost(), 7012);
        LocalCluster localCluster = new LocalCluster();
        ThriftConnection.setLocalCluster(localCluster);
        Topology.setDefaultInitAlgorithms(false);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        Topology.createTopology(topologyBuilder);
        StormTopology createTopology = topologyBuilder.createTopology();
        HashMap hashMap = new HashMap();
        hashMap.put("pipeline", new StormUtils.TopologyTestInfo(createTopology, new File(Utils.getTestdataDir(), "pipeline.xml"), Naming.setDefaultInitializeAlgorithms(QmLogging.enable(createTopologyConfiguration()), false)));
        StormUtils.forTesting(localCluster, hashMap);
        clear();
        new PipelineCommand("pipeline", PipelineCommand.Status.START).execute();
        getPipelineStatusTracker().waitFor("pipeline", PipelineLifecycleEvent.Status.STARTED, 30000);
        clear();
        sleep(3000);
        sleep(1000);
        new ParameterChangeCommand("pipeline", "source", "param", 5).execute();
        waitForExecution(1, 0, new Class[0]);
        clear();
        sleep(4000);
        new AlgorithmChangeCommand("pipeline", "process", "alg2").execute();
        waitForExecution(1, 0, new Class[0]);
        clear();
        sleep(1000);
        this.endpoint.schedule(new AlgorithmChangedMessage("pipeline", "process", "alg1"));
        sleep(2000);
        new PipelineCommand("pipeline", PipelineCommand.Status.STOP).execute();
        waitForExecution(1, 0, new Class[0]);
        clear();
        sleep(2000);
        localCluster.shutdown();
        sleep(6000);
        ThriftConnection.setLocalCluster((ILocalCluster) null);
        StormUtils.forTesting((ILocalCluster) null, (Map) null);
        EventManager.cleanup();
        asserts(testDispatcher);
        MonitoringManager.setDemoMessageState(demoMessageState);
    }

    private void asserts(TestDispatcher testDispatcher) {
        Assert.assertTrue(testDispatcher.getAlgorithmChangedMessageCount() >= 1);
        Assert.assertTrue(testDispatcher.getAlgorithmChangedMessageCount("pipeline", "process", "alg2") >= 1);
        Assert.assertTrue(testDispatcher.getAlgorithmChangedMessageCount("pipeline", "process", "alg1") >= 1);
        System.out.println("#log " + testDispatcher.getLoggingMessageCount() + " #src " + testDispatcher.getMonitoringDataCount("pipeline:source") + " #pcs " + testDispatcher.getMonitoringDataCount("pipeline:process") + " #snk " + testDispatcher.getMonitoringDataCount("pipeline:sink"));
        Assert.assertTrue(testDispatcher.getMonitoringDataCount("pipeline:process") > 0);
        Assert.assertTrue(testDispatcher.getMonitoringDataCount("pipeline:source") > 0);
        Assert.assertTrue(testDispatcher.getMonitoringDataCount("pipeline:sink") > 0);
    }
}
