package org.ow2.paasage.camel.srl.adapter.communication;

import eu.paasage.camel.CamelModel;
import eu.paasage.camel.deployment.DeploymentModel;
import java.util.Iterator;
import org.apache.log4j.Logger;
import org.eclipse.emf.ecore.EObject;
import org.ow2.paasage.camel.srl.adapter.config.CommandLinePropertiesAccessor;
import org.ow2.paasage.camel.srl.adapter.config.ModelSourceType;
import org.ow2.paasage.camel.srl.adapter.execution.Execution;
import org.ow2.paasage.camel.srl.adapter.execution.ImportModelSource;
import org.zeromq.ZMQ;

/* loaded from: input_file:execution-environment-simple-entrypoint-jar-with-dependencies.jar:org/ow2/paasage/camel/srl/adapter/communication/ZeroMqSubscriber.class */
public class ZeroMqSubscriber implements Runnable {
    private static Logger logger = Logger.getLogger(ZeroMqSubscriber.class);
    private final CommandLinePropertiesAccessor conf;
    private final ZMQ.Context context;
    private final ZMQ.Socket socket;
    private final org.ow2.paasage.camel.srl.metrics_collector_accessor.communication.ZeroMqServer mcaZeroMqServer;
    private static final int IO_THREAD_NUM = 1;
    public static final String SEPARATOR = ":";

    public ZeroMqSubscriber(CommandLinePropertiesAccessor commandLinePropertiesAccessor) {
        this(commandLinePropertiesAccessor, null);
    }

    public ZeroMqSubscriber(CommandLinePropertiesAccessor commandLinePropertiesAccessor, String str) {
        this.conf = commandLinePropertiesAccessor;
        String zeroMqUri = str == null ? commandLinePropertiesAccessor.getZeroMqUri() : str;
        String zeroMqQueue = commandLinePropertiesAccessor.getZeroMqQueue();
        this.context = ZMQ.context(1);
        this.socket = this.context.socket(2);
        this.mcaZeroMqServer = new org.ow2.paasage.camel.srl.metrics_collector_accessor.communication.ZeroMqServer(commandLinePropertiesAccessor.getMcaZeroMqPort());
        this.socket.connect(zeroMqUri);
        this.socket.subscribe(zeroMqQueue.getBytes(ZMQ.CHARSET));
    }

    @Override // java.lang.Runnable
    public void run() {
        Execution execution;
        ImportModelSource mapToIms;
        String str;
        while (!Thread.currentThread().isInterrupted()) {
            String recvStr = this.socket.recvStr();
            String recvStr2 = this.socket.recvStr();
            logger.info(String.format("Received raw message: %s - %s", recvStr, recvStr2));
            CdoConfigTuple convertLine = convertLine(recvStr2);
            logger.info(String.format("Parsed message as %s", convertLine));
            try {
                execution = new Execution(this.conf);
                mapToIms = ModelSourceType.mapToIms(this.conf);
                str = null;
                for (EObject eObject : mapToIms.getResources(convertLine.getResourceName())) {
                    if (eObject instanceof CamelModel) {
                        Iterator<DeploymentModel> it = ((CamelModel) eObject).getDeploymentModels().iterator();
                        while (it.hasNext()) {
                            if (it.next().getName().equalsIgnoreCase(convertLine.getDeploymentModelName())) {
                                str = ((CamelModel) eObject).getName();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("Error when executing Task: " + recvStr2 + ". Ignoring so far and continue listening to requests.", e);
            }
            if (str == null) {
                throw new NullPointerException("Could not find a CamelModel containing the DeploymentModel with the name " + convertLine.getDeploymentModelName());
                break;
            }
            logger.info("Forward this message to the ZeroMQ of Metrics-Collector-Accessor. Port: " + this.conf.getMcaZeroMqPort());
            logger.info("Sending MCA message: " + this.conf.getMcaZeroMqQueue() + " and " + convertLine.getResourceName());
            this.mcaZeroMqServer.submitValue(this.conf.getMcaZeroMqQueue(), convertLine.getResourceName());
            logger.info("Run execution based on incoming ZMQ message.");
            execution.run(mapToIms, convertLine.getResourceName(), str, convertLine.getExecutionContext());
        }
        this.socket.close();
        this.context.term();
    }

    public static CdoConfigTuple convertLine(String str) {
        int indexOf = str.indexOf(":");
        int indexOf2 = indexOf + str.substring(indexOf + 1).indexOf(":") + 1;
        String substring = str.substring(0, indexOf);
        String substring2 = str.substring(indexOf + 1, indexOf2);
        String substring3 = str.substring(indexOf2 + 1, str.length());
        if ("".equals(substring3)) {
            substring3 = null;
        }
        return new CdoConfigTuple(substring, substring2, substring3);
    }
}
