/*
 * Decompiled with CFR 0.152.
 */
package de.iip_ecosphere.platform.transport.connectors.impl;

import de.iip_ecosphere.platform.transport.connectors.ReceptionCallback;
import de.iip_ecosphere.platform.transport.connectors.basics.AbstractMqttTransportConnector;
import de.iip_ecosphere.platform.transport.connectors.impl.AbstractTransportConnector;
import de.iip_ecosphere.platform.transport.serialization.Serializer;
import de.iip_ecosphere.platform.transport.serialization.SerializerRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DirectMemoryTransferTransportConnector
extends AbstractTransportConnector {
    public static final String NAME = "IIP-Ecosphere direct memory transfer";
    private static Map<String, List<DirectMemoryTransferTransportConnector>> subscriptions = new HashMap<String, List<DirectMemoryTransferTransportConnector>>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setReceptionCallback(String stream, ReceptionCallback<?> callback) throws IOException {
        super.setReceptionCallback(stream, callback);
        Map<String, List<DirectMemoryTransferTransportConnector>> map = subscriptions;
        synchronized (map) {
            List<DirectMemoryTransferTransportConnector> list = subscriptions.get(stream);
            if (null == list) {
                list = Collections.synchronizedList(new ArrayList());
                subscriptions.put(stream, list);
            }
            list.add(this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unsubscribe(String stream, boolean delete) throws IOException {
        super.unsubscribe(stream, delete);
        Map<String, List<DirectMemoryTransferTransportConnector>> map = subscriptions;
        synchronized (map) {
            List<DirectMemoryTransferTransportConnector> list = subscriptions.remove(stream);
            if (null != list) {
                list.remove(this);
            }
        }
    }

    @Override
    public void syncSend(String stream, Object data) throws IOException {
        this.transfer(stream, data);
    }

    private final <T> void transfer(String stream, Object data) throws IOException {
        List<DirectMemoryTransferTransportConnector> list = subscriptions.get(stream);
        if (null != list) {
            for (DirectMemoryTransferTransportConnector c : list) {
                List<ReceptionCallback<?>> callbacks = c.getCallback(stream);
                if (null == callbacks) continue;
                for (int i = 0; i < callbacks.size(); ++i) {
                    ReceptionCallback<?> callback = callbacks.get(i);
                    if (null == callback) continue;
                    Class<?> type = callback.getType();
                    Serializer<?> serializer = SerializerRegistry.getSerializer(type);
                    Object received = null != serializer ? serializer.clone(type.cast(data)) : type.cast(data);
                    callback.received(received);
                }
            }
        }
    }

    @Override
    public void asyncSend(String stream, Object data) throws IOException {
        this.transfer(stream, data);
    }

    @Override
    public String composeStreamName(String parent, String name) {
        return AbstractMqttTransportConnector.composeNames(parent, name);
    }

    @Override
    public String getName() {
        return NAME;
    }

    public void clear() {
        subscriptions.clear();
    }

    @Override
    public String supportedEncryption() {
        return null;
    }

    @Override
    public String enabledEncryption() {
        return null;
    }
}

