package tests.eu.qualimaster.coordination;

import backtype.storm.ILocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.testing.TestJob;
import eu.qualimaster.base.pipeline.RecordingTopologyBuilder;
import eu.qualimaster.coordination.CoordinationManager;
import eu.qualimaster.coordination.ParallelismChangeRequest;
import eu.qualimaster.coordination.StormUtils;
import eu.qualimaster.coordination.ZkUtils;
import eu.qualimaster.coordination.commands.AlgorithmChangeCommand;
import eu.qualimaster.coordination.commands.CoordinationCommand;
import eu.qualimaster.coordination.commands.ParameterChangeCommand;
import eu.qualimaster.coordination.commands.PipelineCommand;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import eu.qualimaster.infrastructure.PipelineOptions;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import tests.eu.qualimaster.storm.Naming;
import tests.eu.qualimaster.storm.SignalCollector;
import tests.eu.qualimaster.storm.Topology;

/* loaded from: input_file:tests/eu/qualimaster/coordination/StormTests.class */
public class StormTests extends AbstractCoordinationTests {
    private static final String IS_NULL = "*null*";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:tests/eu/qualimaster/coordination/StormTests$ParallelismChangeLevel.class */
    public enum ParallelismChangeLevel {
        NONE(false),
        WORKER(true),
        EXECUTOR(true);

        private boolean changes;

        ParallelismChangeLevel(boolean z) {
            this.changes = z;
        }

        public boolean changes() {
            return this.changes;
        }
    }

    /* loaded from: input_file:tests/eu/qualimaster/coordination/StormTests$TestPipelineCommandsFailingJob.class */
    private class TestPipelineCommandsFailingJob implements TestJob {
        private TestPipelineCommandsFailingJob() {
        }

        public void run(ILocalCluster iLocalCluster) throws Exception {
            StormUtils.forTesting(iLocalCluster, new HashMap());
            StormTests.this.clear();
            PipelineOptions pipelineOptions = new PipelineOptions();
            pipelineOptions.setNumberOfWorkers(3);
            new PipelineCommand("abba", PipelineCommand.Status.START, pipelineOptions).execute();
            StormTests.this.waitForExecution(1, 0, new Class[0]);
            Assert.assertEquals(1L, StormTests.this.getFailedHandler().getFailedCount());
            StormTests.this.clear();
            new PipelineCommand((String) null, PipelineCommand.Status.START, pipelineOptions).execute();
            StormTests.this.waitForExecution(0, 1, new Class[0]);
            Assert.assertEquals(1L, StormTests.this.getFailedHandler().getFailedCount());
            StormTests.this.clear();
            CoordinationCommand pipelineCommand = new PipelineCommand("abba", (PipelineCommand.Status) null);
            pipelineCommand.execute();
            StormTests.this.waitForExecution(0, 1, new Class[0]);
            Assert.assertTrue(StormTests.this.getTracer().contains(pipelineCommand));
            Assert.assertEquals(1L, StormTests.this.getTracer().getLogEntryCount());
            Assert.assertEquals(1L, StormTests.this.getFailedHandler().getFailedCount());
            StormTests.this.clear();
            CoordinationCommand pipelineCommand2 = new PipelineCommand("abba", PipelineCommand.Status.STOP);
            pipelineCommand2.execute();
            StormTests.this.waitForExecution(0, 1, new Class[0]);
            Assert.assertTrue(StormTests.this.getTracer().contains(pipelineCommand2));
            Assert.assertEquals(1L, StormTests.this.getTracer().getLogEntryCount());
            Assert.assertEquals(1L, StormTests.this.getFailedHandler().getFailedCount());
            StormTests.this.clear();
            CoordinationCommand pipelineCommand3 = new PipelineCommand((String) null, PipelineCommand.Status.STOP);
            pipelineCommand3.execute();
            StormTests.this.waitForExecution(0, 1, new Class[0]);
            Assert.assertTrue(StormTests.this.getTracer().contains(pipelineCommand3));
            Assert.assertEquals(1L, StormTests.this.getTracer().getLogEntryCount());
            Assert.assertEquals(1L, StormTests.this.getFailedHandler().getFailedCount());
            StormTests.this.clear();
            StormUtils.forTesting((ILocalCluster) null, (Map) null);
        }
    }

    @Override // tests.eu.qualimaster.coordination.AbstractCoordinationTests
    @Before
    public void setUp() {
        Utils.setModelProvider(Utils.INFRASTRUCTURE_TEST_MODEL_PROVIDER);
        Utils.configure();
        super.setUp();
        CoordinationManager.registerTestMapping(TestNameMapping.INSTANCE);
    }

    @Override // tests.eu.qualimaster.coordination.AbstractCoordinationTests
    @After
    public void tearDown() {
        super.tearDown();
        Utils.dispose();
    }

    private void testPipelineCommands(ParallelismChangeLevel parallelismChangeLevel) throws IOException {
        LocalStormEnvironment localStormEnvironment = new LocalStormEnvironment();
        RecordingTopologyBuilder recordingTopologyBuilder = new RecordingTopologyBuilder();
        Topology.createTopology(recordingTopologyBuilder);
        StormTopology createTopology = recordingTopologyBuilder.createTopology();
        HashMap hashMap = new HashMap();
        hashMap.put("pipeline", new StormUtils.TopologyTestInfo(createTopology, new File(Utils.getTestdataDir(), "pipeline.xml"), createTopologyConfiguration()));
        localStormEnvironment.setTopologies(hashMap);
        clear();
        PipelineCommand pipelineCommand = new PipelineCommand("pipeline", PipelineCommand.Status.START);
        pipelineCommand.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(pipelineCommand));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        sleep(3000);
        PipelineCommand pipelineCommand2 = new PipelineCommand("pipeline", PipelineCommand.Status.CONNECT);
        pipelineCommand2.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(pipelineCommand2));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        handleParallelism(parallelismChangeLevel, 1);
        sleep(4000);
        handleParallelism(parallelismChangeLevel, 2);
        PipelineCommand pipelineCommand3 = new PipelineCommand("pipeline", PipelineCommand.Status.DISCONNECT);
        pipelineCommand3.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(pipelineCommand3));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        sleep(1000);
        PipelineCommand pipelineCommand4 = new PipelineCommand("pipeline", PipelineCommand.Status.STOP);
        pipelineCommand4.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(pipelineCommand4));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        sleep(2000);
        localStormEnvironment.shutdown();
        sleep(6000);
        localStormEnvironment.cleanup();
    }

    private void handleParallelism(ParallelismChangeLevel parallelismChangeLevel, int i) throws IOException {
        if (parallelismChangeLevel.changes()) {
            if (1 == i) {
                sleep(1000);
                HashMap hashMap = new HashMap();
                if (ParallelismChangeLevel.EXECUTOR == parallelismChangeLevel) {
                    hashMap.put("process", new ParallelismChangeRequest(1));
                } else if (ParallelismChangeLevel.WORKER == parallelismChangeLevel) {
                    hashMap.put("process", new ParallelismChangeRequest(0, InetAddress.getLocalHost().getCanonicalHostName(), true));
                }
                StormUtils.changeParallelism("pipeline", hashMap);
                sleep(8000);
                return;
            }
            HashMap hashMap2 = new HashMap();
            if (ParallelismChangeLevel.EXECUTOR == parallelismChangeLevel) {
                hashMap2.put("process", new ParallelismChangeRequest(-1));
            } else if (ParallelismChangeLevel.WORKER == parallelismChangeLevel) {
                sleep(10000);
                hashMap2.put("process", new ParallelismChangeRequest(0, InetAddress.getLocalHost().getCanonicalHostName(), true));
            }
            StormUtils.changeParallelism("pipeline", hashMap2);
            sleep(10000);
        }
    }

    @Test
    public void testPipelineCommand() throws IOException {
        testPipelineCommands(ParallelismChangeLevel.NONE);
    }

    @Test
    public void testPipelineParallelismExecutor() throws IOException {
        if (ZkUtils.isQmStormVersion()) {
            testPipelineCommands(ParallelismChangeLevel.EXECUTOR);
        }
    }

    @Test
    public void testPipelineParallelismWorker() throws IOException {
        if (ZkUtils.isQmStormVersion()) {
            testPipelineCommands(ParallelismChangeLevel.WORKER);
        }
    }

    @Test
    public void testPipelineCommandFailing() {
        testInLocalCluster(new TestPipelineCommandsFailingJob());
    }

    @Test
    public void testSignals() {
        LocalStormEnvironment localStormEnvironment = new LocalStormEnvironment();
        RecordingTopologyBuilder recordingTopologyBuilder = new RecordingTopologyBuilder();
        Topology.createTopology(recordingTopologyBuilder);
        StormTopology createTopology = recordingTopologyBuilder.createTopology();
        Map createTopologyConfiguration = createTopologyConfiguration();
        recordingTopologyBuilder.close("pipeline", createTopologyConfiguration);
        HashMap hashMap = new HashMap();
        hashMap.put("pipeline", new StormUtils.TopologyTestInfo(createTopology, new File(Utils.getTestdataDir(), "pipeline.xml"), createTopologyConfiguration));
        localStormEnvironment.setTopologies(hashMap);
        clear();
        PipelineOptions pipelineOptions = new PipelineOptions();
        pipelineOptions.setExecutorArgument("process", "delay", 3);
        new PipelineCommand("pipeline", PipelineCommand.Status.START, pipelineOptions).execute();
        getPipelineStatusTracker().waitFor("pipeline", PipelineLifecycleEvent.Status.STARTED, 10000);
        clear();
        sleep(5000);
        ParameterChangeCommand parameterChangeCommand = new ParameterChangeCommand("pipeline", "source", "param", 5);
        parameterChangeCommand.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(parameterChangeCommand));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        sleep(4000);
        AlgorithmChangeCommand algorithmChangeCommand = new AlgorithmChangeCommand("pipeline", "process", "alg2");
        algorithmChangeCommand.execute();
        waitForExecution(1, 0, new Class[0]);
        Assert.assertTrue(getTracer().contains(algorithmChangeCommand));
        Assert.assertEquals(1L, getTracer().getLogEntryCount());
        Assert.assertEquals(0L, getFailedHandler().getFailedCount());
        clear();
        sleep(2000);
        new PipelineCommand("pipeline", PipelineCommand.Status.STOP).execute();
        waitForExecution(1, 0, new Class[0]);
        clear();
        List<SignalCollector.SignalEntry> read = SignalCollector.read(Naming.LOG_PROCESS);
        assertSignalEntry(read, "alg2", IS_NULL, IS_NULL);
        assertSignalEntry(read, SignalCollector.NAME_SHUTDOWN, IS_NULL, IS_NULL);
        List<SignalCollector.SignalEntry> read2 = SignalCollector.read(Naming.LOG_SOURCE);
        assertSignalEntry(read2, IS_NULL, "param", "5");
        assertSignalEntry(read2, SignalCollector.NAME_SHUTDOWN, IS_NULL, IS_NULL);
        assertSignalEntry(SignalCollector.read(Naming.LOG_SINK), SignalCollector.NAME_SHUTDOWN, IS_NULL, IS_NULL);
        localStormEnvironment.shutdown();
        sleep(6000);
        localStormEnvironment.cleanup();
    }

    private static boolean matchesSignalEntryField(String str, String str2) {
        return (IS_NULL.equals(str) && null == str2) || str.equals(str2);
    }

    private static void assertSignalEntry(List<SignalCollector.SignalEntry> list, String str, String str2, String str3) {
        Assert.assertNotNull("entries must be recorded - no entries", list);
        boolean z = null == str && null == str2 && null == str3;
        for (int i = 0; !z && i < list.size(); i++) {
            SignalCollector.SignalEntry signalEntry = list.get(i);
            int i2 = 0;
            int i3 = 0;
            if (null != str) {
                i2 = 0 + 1;
                if (matchesSignalEntryField(str, signalEntry.getAlgorithm())) {
                    i3 = 0 + 1;
                }
            }
            if (null != str2) {
                i2++;
                if (matchesSignalEntryField(str2, signalEntry.getParameterName())) {
                    i3++;
                }
            }
            if (null != str3) {
                i2++;
                if (matchesSignalEntryField(str3, signalEntry.getParameterValue())) {
                    i3++;
                }
            }
            if (i2 > 0) {
                z = i2 == i3;
            }
        }
        Assert.assertTrue("signal entry for (" + str + " " + str2 + " " + str3 + ") not found", z);
    }
}
