/*
 * Decompiled with CFR 0.152.
 */
package gr.uoa.di.driver.enabling.issn;

import eu.dnetlib.api.enabling.ISSNService;
import eu.dnetlib.api.enabling.ISSNServiceException;
import eu.dnetlib.domain.enabling.Notification;
import eu.dnetlib.domain.enabling.Subscription;
import gr.uoa.di.driver.enabling.issn.NotificationListener;
import gr.uoa.di.driver.enabling.issn.SNManager;
import gr.uoa.di.driver.util.ServiceLocator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

public class SNManagerImpl
implements SNManager {
    private Logger logger = Logger.getLogger(this.getClass());
    private ScheduledExecutorService executor = null;
    private ServiceLocator<ISSNService> snLocator = null;
    private int timeToLive = 3600;
    private int threadPoolSize = 5;
    private Map<String, SubscriptionTask> tasks = new HashMap<String, SubscriptionTask>();
    private Map<String, SubscriptionTask> idMap = new HashMap<String, SubscriptionTask>();

    public void init() {
        this.logger.debug((Object)("Creating executor with " + this.threadPoolSize + " threads"));
        this.executor = Executors.newScheduledThreadPool(this.threadPoolSize);
    }

    @Override
    public void subscribe(Subscription sub, NotificationListener listener) {
        if (!sub.getTopic().split("/")[2].equals("*")) {
            SubscriptionTask task = this.tasks.get(sub.getTopic());
            if (task == null) {
                this.logger.debug((Object)("New topic: " + sub.getTopic()));
                task = new SubscriptionTask(sub, listener);
                this.tasks.put(sub.getTopic(), task);
                sub.setTimeToLive(this.timeToLive);
                this.executor.execute(task);
            } else {
                this.logger.debug((Object)"Topic already exists, adding listener");
                task.getListeners().add(listener);
            }
        } else {
            this.logger.debug((Object)("subscription for resourceType: " + sub.getTopic().split("/")[1] + " ignored due to missing identifier"));
        }
    }

    @Override
    public void unsubscribe(Subscription sub, NotificationListener listener) {
        SubscriptionTask task = this.tasks.get(sub.getTopic());
        if (task != null) {
            task.getListeners().remove(listener);
        }
    }

    @Override
    public void unsubscribe(NotificationListener listener) {
        for (SubscriptionTask task : this.tasks.values()) {
            task.getListeners().remove(listener);
        }
    }

    @Override
    public void notify(Notification notification) {
        SubscriptionTask task = this.idMap.get(notification.getSubscriptionId());
        if (task == null || task.getListeners().size() == 0) {
            this.logger.warn((Object)("No listeners for notification: " + notification.getTopic()));
        } else {
            for (NotificationListener listener : task.getListeners()) {
                this.executor.execute(new NotificationTask(listener, notification));
            }
        }
    }

    public void shutdown() {
        this.executor.shutdownNow();
        this.logger.debug((Object)"Removing active subscriptions");
        for (String id : this.idMap.keySet()) {
            try {
                this.snLocator.getService().unsubscribe(id);
            }
            catch (ISSNServiceException e) {
                this.logger.error((Object)("Error removing subscription: " + id));
            }
        }
    }

    public ServiceLocator<ISSNService> getSnLocator() {
        return this.snLocator;
    }

    public void setSnLocator(ServiceLocator<ISSNService> snLocator) {
        this.snLocator = snLocator;
    }

    public int getTimeToLive() {
        return this.timeToLive;
    }

    public void setTimeToLive(int timeToLive) {
        this.timeToLive = timeToLive;
    }

    public int getThreadPoolSize() {
        return this.threadPoolSize;
    }

    public void setThreadPoolSize(int threadPoolSize) {
        this.threadPoolSize = threadPoolSize;
    }

    private class SubscriptionTask
    implements Runnable {
        private Subscription subscription = null;
        private List<NotificationListener> listeners = new ArrayList<NotificationListener>();

        public SubscriptionTask(Subscription sub, NotificationListener listener) {
            this.subscription = sub;
            this.listeners.add(listener);
        }

        @Override
        public void run() {
            block10: {
                try {
                    if (this.subscription.getId() == null) {
                        SNManagerImpl.this.logger.debug((Object)"New subscription. Subscribing and scheduling for refresh");
                        try {
                            this.subscription.setId(((ISSNService)SNManagerImpl.this.snLocator.getService()).subscribe(this.subscription.getEpr(), this.subscription.getTopic(), this.subscription.getTimeToLive()));
                        }
                        catch (ISSNServiceException e) {
                            SNManagerImpl.this.logger.error((Object)("Error adding topic: " + this.subscription.getTopic()), (Throwable)e);
                        }
                        SNManagerImpl.this.executor.schedule(this, (long)((double)this.subscription.getTimeToLive() * 0.9), TimeUnit.SECONDS);
                        SNManagerImpl.this.idMap.put(this.subscription.getId(), this);
                        break block10;
                    }
                    if (this.listeners.size() > 0) {
                        SNManagerImpl.this.logger.debug((Object)("Subscription " + this.subscription.getId() + " is already subscribed. Refreshing and rescheduling"));
                        try {
                            ((ISSNService)SNManagerImpl.this.snLocator.getService()).renew(this.subscription.getId(), this.subscription.getTimeToLive());
                        }
                        catch (ISSNServiceException e) {
                            SNManagerImpl.this.logger.error((Object)("Error refreshing subscription " + this.subscription.getId()), (Throwable)e);
                        }
                        SNManagerImpl.this.executor.schedule(this, (long)((double)this.subscription.getTimeToLive() * 0.9), TimeUnit.SECONDS);
                        break block10;
                    }
                    SNManagerImpl.this.logger.debug((Object)"No listeners for this subscription. Removing it");
                    try {
                        ((ISSNService)SNManagerImpl.this.snLocator.getService()).unsubscribe(this.subscription.getId());
                    }
                    catch (ISSNServiceException e) {
                        SNManagerImpl.this.logger.error((Object)("Error removing subscription " + this.subscription.getId()), (Throwable)e);
                    }
                    SNManagerImpl.this.idMap.remove(this.subscription.getId());
                    SNManagerImpl.this.tasks.remove(this.subscription);
                }
                catch (Exception e) {
                    SNManagerImpl.this.logger.error((Object)"Error processing subscription");
                }
            }
        }

        public List<NotificationListener> getListeners() {
            return this.listeners;
        }
    }

    private class NotificationTask
    implements Runnable {
        private NotificationListener listener = null;
        private Notification notification = null;

        public NotificationTask(NotificationListener listener, Notification notification) {
            this.listener = listener;
            this.notification = notification;
        }

        @Override
        public void run() {
            try {
                this.listener.processNotification(this.notification);
            }
            catch (Exception e) {
                SNManagerImpl.this.logger.error((Object)"Error calling listener", (Throwable)e);
            }
        }
    }
}

