package tests.eu.qualimaster.coordination;

import backtype.storm.generated.StormTopology;
import eu.qualimaster.base.pipeline.RecordingTopologyBuilder;
import eu.qualimaster.coordination.StormUtils;
import eu.qualimaster.coordination.commands.AlgorithmChangeCommand;
import eu.qualimaster.coordination.commands.PipelineCommand;
import eu.qualimaster.events.EventManager;
import eu.qualimaster.events.IReturnableEvent;
import eu.qualimaster.infrastructure.PipelineLifecycleEvent;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import tests.eu.qualimaster.coordination.AbstractCoordinationTests;
import tests.eu.qualimaster.storm.Naming;
import tests.eu.qualimaster.storm.Process;
import tests.eu.qualimaster.storm.ReceivingSpout;
import tests.eu.qualimaster.storm.SendingBolt;
import tests.eu.qualimaster.storm.Sink;
import tests.eu.qualimaster.storm.Source;
import tests.eu.qualimaster.storm.Src;

/* loaded from: input_file:tests/eu/qualimaster/coordination/SubPipelineTests.class */
public class SubPipelineTests extends AbstractCoordinationTests {
    private static final String SUB_RECEIVER = "SubReceiver";
    private static final String SUB_SENDER = "SubSender";
    private AbstractCoordinationTests.PipelineLifecycleEventHandler handler;

    @Override // tests.eu.qualimaster.coordination.AbstractCoordinationTests
    @Before
    public void setUp() {
        Utils.setModelProvider(Utils.INFRASTRUCTURE_TEST_MODEL_PROVIDER);
        Utils.configure();
        super.setUp();
        this.handler = new AbstractCoordinationTests.PipelineLifecycleEventHandler(PipelineLifecycleEvent.Status.CHECKING, PipelineLifecycleEvent.Status.STARTING, PipelineLifecycleEvent.Status.INITIALIZED);
        EventManager.register(this.handler);
    }

    @Override // tests.eu.qualimaster.coordination.AbstractCoordinationTests
    @After
    public void tearDown() {
        EventManager.unregister(this.handler);
        super.tearDown();
        Utils.dispose();
    }

    private void registerMainTopology(Map<String, StormUtils.TopologyTestInfo> map, Map map2) {
        RecordingTopologyBuilder recordingTopologyBuilder = new RecordingTopologyBuilder();
        recordingTopologyBuilder.setSpout("source", new Source(Src.class, "pipeline"), 1).setNumTasks(1);
        recordingTopologyBuilder.setBolt("process", new Process("process", "pipeline", new String[0]), 1).setNumTasks(3).shuffleGrouping("source");
        recordingTopologyBuilder.setBolt("MainSender", new SendingBolt("MainSender", "pipeline", true, false, 9000), 1).setNumTasks(1).shuffleGrouping("process");
        recordingTopologyBuilder.setSpout("MainReceiver", new ReceivingSpout("MainReceiver", "pipeline", true, false, 9001));
        recordingTopologyBuilder.setBolt("sink", new Sink("pipeline"), 1).setNumTasks(1).shuffleGrouping("MainReceiver");
        StormTopology createTopology = recordingTopologyBuilder.createTopology();
        recordingTopologyBuilder.close("pipeline", map2);
        map.put("pipeline", new StormUtils.TopologyTestInfo(createTopology, new File(Utils.getTestdataDir(), "sub/mainPipeline.xml"), map2));
    }

    private void registerSubTopology(Map<String, StormUtils.TopologyTestInfo> map, Map map2) {
        RecordingTopologyBuilder recordingTopologyBuilder = new RecordingTopologyBuilder();
        recordingTopologyBuilder.setSpout(SUB_RECEIVER, new ReceivingSpout(SUB_RECEIVER, Naming.SUB_PIPELINE_NAME, true, false, 9000), 1).setNumTasks(1);
        recordingTopologyBuilder.setBolt(SUB_SENDER, new SendingBolt(SUB_SENDER, Naming.SUB_PIPELINE_NAME, true, false, 9001), 1).setNumTasks(1).shuffleGrouping(SUB_RECEIVER);
        StormTopology createTopology = recordingTopologyBuilder.createTopology();
        recordingTopologyBuilder.close(Naming.SUB_PIPELINE_NAME, map2);
        map.put(Naming.SUB_PIPELINE_NAME, new StormUtils.TopologyTestInfo(createTopology, new File(Utils.getTestdataDir(), "sub/subPipeline.xml"), map2));
    }

    @Test
    public void testSubpipeline() {
        LocalStormEnvironment localStormEnvironment = new LocalStormEnvironment();
        Map createTopologyConfiguration = createTopologyConfiguration();
        HashMap hashMap = new HashMap();
        registerMainTopology(hashMap, createTopologyConfiguration);
        registerSubTopology(hashMap, createTopologyConfiguration);
        localStormEnvironment.setTopologies(hashMap);
        clear();
        new PipelineCommand("pipeline", PipelineCommand.Status.START).execute();
        sleep(1000);
        getPipelineStatusTracker().waitFor("pipeline", PipelineLifecycleEvent.Status.CREATED, 5000);
        sleep(5000);
        EventManager.send(new PipelineLifecycleEvent("pipeline", PipelineLifecycleEvent.Status.INITIALIZED, (IReturnableEvent) null));
        EventManager.send(new AlgorithmChangeCommand("pipeline", "process", "alg2"));
        sleep(1000);
        getPipelineStatusTracker().waitFor(Naming.SUB_PIPELINE_NAME, PipelineLifecycleEvent.Status.CREATED, 5000);
        sleep(5000);
        EventManager.send(new PipelineLifecycleEvent(Naming.SUB_PIPELINE_NAME, PipelineLifecycleEvent.Status.INITIALIZED, (IReturnableEvent) null));
        sleep(5000);
        new PipelineCommand("pipeline", PipelineCommand.Status.STOP).execute();
        sleep(4000);
        localStormEnvironment.shutdown();
        localStormEnvironment.cleanup();
    }
}
