package org.gcube.messaging.common.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.QueueSession;
import javax.jms.Topic;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.gcube.common.core.scope.GCUBEScope;
import org.gcube.common.core.utils.logging.GCUBELog;

/* loaded from: input_file:org/gcube/messaging/common/consumer/BrokerSubscription.class */
public abstract class BrokerSubscription<LISTENER extends MessageListener> extends Thread implements ExceptionListener {
    protected BrokerSubscription<LISTENER>.TopicCouple couple;
    protected LISTENER listener;
    protected static GCUBELog logger = new GCUBELog(BrokerSubscription.class);
    protected static boolean transacted = false;
    protected static int ackMode = 1;
    protected ArrayList<String> messageSelectors = new ArrayList<>();
    protected ArrayList<Connection> connections = new ArrayList<>();
    protected boolean queue = false;

    /* loaded from: input_file:org/gcube/messaging/common/consumer/BrokerSubscription$TopicCouple.class */
    public class TopicCouple {
        GCUBEScope scope;
        String topicName;

        public TopicCouple() {
        }

        public GCUBEScope getScope() {
            return this.scope;
        }

        public void setScope(GCUBEScope gCUBEScope) {
            this.scope = gCUBEScope;
        }

        public String getTopicName() {
            return this.topicName;
        }

        public void setTopicName(String str) {
            this.topicName = str;
        }
    }

    public abstract void setScope(GCUBEScope gCUBEScope);

    public void subscribe() throws Exception {
        if (this.queue) {
            setupQueueSubscription();
        } else {
            if (this.messageSelectors.size() == 0) {
                setupDurableSubscribers(new String[0]);
                return;
            }
            Iterator<String> it = this.messageSelectors.iterator();
            while (it.hasNext()) {
                setupDurableSubscribers(it.next());
            }
        }
    }

    public ArrayList<Connection> getConnections() {
        return this.connections;
    }

    public void setConnections(ArrayList<Connection> arrayList) {
        this.connections = arrayList;
    }

    public void setupDurableSubscribers(String... strArr) throws Exception {
        if (ServiceContext.getContext().getUseEmbeddedBroker().booleanValue()) {
            new ActiveMQConnectionFactory("tcp://localhost:61616");
            Thread.sleep(10000L);
        } else if (getCouple().getScope().getServiceMap().getEndpoints("MessageBroker") != null) {
            try {
                Connection createTopicConnection = new ActiveMQConnectionFactory(((EndpointReferenceType) getCouple().getScope().getServiceMap().getEndpoints("MessageBroker").iterator().next()).getAddress().toString()).createTopicConnection();
                if (strArr.length > 0) {
                    createTopicConnection.setClientID(getCouple().getTopicName() + strArr[0]);
                } else {
                    createTopicConnection.setClientID(getCouple().getTopicName() + this.listener.getClass());
                }
                createTopicConnection.start();
                createTopicConnection.setExceptionListener(this);
                TopicSession createTopicSession = createTopicConnection.createTopicSession(transacted, ackMode);
                Topic createTopic = createTopicSession.createTopic(getCouple().getTopicName());
                (strArr.length > 0 ? createTopicSession.createDurableSubscriber(createTopic, createTopic.getTopicName() + strArr[0], strArr[0], false) : createTopicSession.createDurableSubscriber(createTopic, createTopic.getTopicName())).setMessageListener(this.listener);
                this.connections.add(createTopicConnection);
            } catch (JMSException e) {
                logger.error("Error creating Durable Subscriber", e);
                throw e;
            } catch (Exception e2) {
                logger.error("Error creating Durable Subscriber", e2);
                throw e2;
            }
        } else {
            logger.warn("Impossible to setup Durable Subscriber, Broker epr not specified for the scope: " + getCouple().getScope().toString());
        }
        logger.info("Started Durable Subscriber for topic: " + getCouple().getTopicName());
    }

    public void setupQueueSubscription() throws Exception {
        if (getCouple().getScope().getServiceMap().getEndpoints("MessageBroker") != null) {
            try {
                Connection createQueueConnection = new ActiveMQConnectionFactory(((EndpointReferenceType) getCouple().getScope().getServiceMap().getEndpoints("MessageBroker").iterator().next()).getAddress().toString()).createQueueConnection();
                createQueueConnection.setClientID(getCouple().getTopicName());
                createQueueConnection.start();
                createQueueConnection.setExceptionListener(this);
                QueueSession createQueueSession = createQueueConnection.createQueueSession(transacted, ackMode);
                createQueueSession.createReceiver(createQueueSession.createQueue(getCouple().getTopicName())).setMessageListener(this.listener);
                this.connections.add(createQueueConnection);
            } catch (JMSException e) {
                logger.error("Error creating Queue Receiver", e);
                throw e;
            } catch (Exception e2) {
                logger.error("Error creating Queue Receiver", e2);
                throw e2;
            }
        } else {
            logger.warn("Impossible to setup Queue Receiver, Broker epr not specified for the scope: " + getCouple().getScope().toString());
        }
        logger.info("Started Queue receveiver for topic: " + getCouple().getTopicName());
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            Iterator<Connection> it = this.connections.iterator();
            while (it.hasNext()) {
                Connection next = it.next();
                next.stop();
                next.close();
            }
            this.connections.clear();
        } catch (JMSException e) {
            logger.debug("Exception stopping the connection", e);
            this.connections.clear();
        }
        while (true) {
            try {
                subscribe();
                return;
            } catch (InvalidClientIDException e2) {
                logger.error("Subscription has not been reset", e2);
                return;
            } catch (Exception e3) {
                logger.error("Error on subscription", e3);
                try {
                    Thread.sleep(12000L);
                } catch (InterruptedException e4) {
                    e4.printStackTrace();
                }
            }
        }
    }

    public void onException(JMSException jMSException) {
        logger.error(jMSException.getMessage());
        logger.error(jMSException);
        run();
    }

    public BrokerSubscription<LISTENER>.TopicCouple getCouple() {
        return this.couple;
    }

    public void setCouple(BrokerSubscription<LISTENER>.TopicCouple topicCouple) {
        this.couple = topicCouple;
    }

    public LISTENER getListener() {
        return this.listener;
    }

    public void setListener(LISTENER listener) {
        this.listener = listener;
    }

    public ArrayList<String> getMessageSelectors() {
        return this.messageSelectors;
    }

    public void setMessageSelectors(ArrayList<String> arrayList) {
        this.messageSelectors = arrayList;
    }
}
