/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.lbs.matchers;

import eu.dnetlib.lbs.elasticsearch.Event;
import eu.dnetlib.lbs.elasticsearch.Notification;
import eu.dnetlib.lbs.elasticsearch.NotificationRepository;
import eu.dnetlib.lbs.events.output.DispatcherManager;
import eu.dnetlib.lbs.properties.ElasticSearchProperties;
import eu.dnetlib.lbs.subscriptions.MapCondition;
import eu.dnetlib.lbs.subscriptions.NotificationMode;
import eu.dnetlib.lbs.subscriptions.Subscription;
import eu.dnetlib.lbs.subscriptions.SubscriptionRepository;
import eu.dnetlib.lbs.utils.LbsQueue;
import eu.dnetlib.lbs.utils.QueueManager;
import eu.dnetlib.lbs.utils.ThreadManager;
import java.util.ArrayList;
import java.util.Date;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.ElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.ScrolledPage;
import org.springframework.data.elasticsearch.core.query.NativeSearchQuery;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import org.springframework.stereotype.Component;

@Component
public class SubscriptionEventMatcher
implements Runnable {
    private static final int SCROLL_TIMEOUT_IN_MILLIS = 300000;
    private static final int SCROLL_PAGE_SIZE = 100;
    @Autowired
    private QueueManager queueManager;
    @Autowired
    private DispatcherManager dispatcherManager;
    @Autowired
    private SubscriptionRepository subscriptionRepo;
    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
    @Autowired
    private ElasticSearchProperties props;
    private LbsQueue<Subscription, Subscription> queue;
    @Autowired
    private NotificationRepository notificationRepository;
    @Autowired
    private ThreadManager threadManager;
    private static final Log log = LogFactory.getLog(SubscriptionEventMatcher.class);

    @PostConstruct
    public void init() {
        this.queue = this.queueManager.newQueue("subscr-events-matcher-queue", Subscription::isReady);
        this.threadManager.newThread("subscr-events-matcher", (Runnable)this);
    }

    public void startMatching(Subscription s) {
        if (this.queue.offer((Object)s)) {
            log.info((Object)("Matching of subscription " + s.getSubscriptionId() + " in queue"));
        } else {
            log.info((Object)("Subscription " + s.getSubscriptionId() + " not queued"));
        }
    }

    @Override
    public void run() {
        log.info((Object)("SubscriptionEventMatcher started: " + Thread.currentThread().getName()));
        while (true) {
            try {
                while (true) {
                    Subscription s;
                    if ((s = (Subscription)this.queue.takeOne()) == null || !Subscription.isReady((Subscription)s)) {
                        continue;
                    }
                    this.startSubscriptionEventsMatcher(s);
                    s.setLastNotificationDate(new Date());
                    this.subscriptionRepo.save((Object)s);
                }
            }
            catch (Throwable e) {
                log.error((Object)"Error iterating matching queue", e);
                continue;
            }
            break;
        }
    }

    private void startSubscriptionEventsMatcher(Subscription s) {
        log.info((Object)("Start matching subscription: " + s));
        try {
            BoolQueryBuilder mapQuery = QueryBuilders.boolQuery();
            s.getConditionsAsList().stream().map(MapCondition::asQueryBuilder).filter(Objects::nonNull).forEach(arg_0 -> ((BoolQueryBuilder)mapQuery).must(arg_0));
            long lastNotification = s.getLastNotificationDate() != null ? s.getLastNotificationDate().getTime() : 0L;
            NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery((QueryBuilder)QueryBuilders.boolQuery().must((QueryBuilder)QueryBuilders.matchQuery((String)"topic", (Object)s.getTopic())).must((QueryBuilder)QueryBuilders.rangeQuery((String)"creationDate").from((Object)lastNotification)).must((QueryBuilder)QueryBuilders.nestedQuery((String)"map", (QueryBuilder)mapQuery, (ScoreMode)ScoreMode.None))).withSearchType(SearchType.DEFAULT).withIndices(new String[]{this.props.getEventsIndexName()}).withTypes(new String[]{this.props.getEventsIndexType()}).withPageable((Pageable)PageRequest.of((int)0, (int)10)).build();
            ArrayList<Event> eventsToNotify = new ArrayList<Event>();
            ScrolledPage scrollResp = (ScrolledPage)this.elasticsearchTemplate.startScroll(10000L, (SearchQuery)searchQuery, Event.class);
            while (scrollResp.hasContent()) {
                for (Event e : scrollResp.getContent()) {
                    Notification n = new Notification(s, e);
                    if (!this.isNotAlreadyNotified(n)) continue;
                    this.notificationRepository.save((Object)n);
                    if (s.getMode() == NotificationMode.EMAIL && eventsToNotify.size() < 20) {
                        eventsToNotify.add(e);
                        continue;
                    }
                    if (s.getMode() != NotificationMode.MOCK) continue;
                    this.dispatcherManager.dispatch(s, new Event[]{e});
                }
                scrollResp = (ScrolledPage)this.elasticsearchTemplate.continueScroll(scrollResp.getScrollId(), 10000L, Event.class);
            }
            this.elasticsearchTemplate.clearScroll(scrollResp.getScrollId());
            log.info((Object)("End matching subscription: " + s.getSubscriptionId()));
            if (s.getMode() == NotificationMode.EMAIL && !eventsToNotify.isEmpty()) {
                this.dispatcherManager.dispatch(s, eventsToNotify.toArray(new Event[eventsToNotify.size()]));
            }
        }
        catch (Throwable e) {
            log.error((Object)("Error matching subscription: " + s.getSubscriptionId()), e);
        }
    }

    private boolean isNotAlreadyNotified(Notification n) {
        return !this.notificationRepository.findById((Object)n.getNotificationId()).isPresent();
    }
}

