/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.broker.oa_alerts;

import com.fasterxml.jackson.core.JsonProcessingException;
import eu.dnetlib.broker.objects.alerts.ValidatorAlertMessage;
import eu.dnetlib.broker.objects.alerts.ValidatorErrorMessage;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.ConditionParams;
import eu.dnetlib.dhp.broker.model.MapCondition;
import eu.dnetlib.dhp.broker.model.OaAlertMappedFields;
import eu.dnetlib.dhp.broker.model.OaAlertNotification;
import eu.dnetlib.dhp.broker.model.Subscription;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.schema.mdstore.ValidationType;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.validator2.result_models.StandardResult;
import eu.dnetlib.validator2.result_models.StandardValidationResult;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenerateAlertNotificationsJob {
    private static final String TOPIC_PREFIX = "ALERT/VALIDATOR/";
    private static final Logger log = LoggerFactory.getLogger(GenerateAlertNotificationsJob.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)GenerateAlertNotificationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa_alerts/generate_alert_notifications.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String mdstoreInputVersion = parser.get("mdstoreInputVersion");
        String outputPath = parser.get("outputDir");
        String dsId = ((Provenance)DHPUtils.MAPPER.readValue(parser.get("provenance"), Provenance.class)).getDatasourceId();
        log.info("dsId: {}", (Object)dsId);
        String dsName = ((Provenance)DHPUtils.MAPPER.readValue(parser.get("provenance"), Provenance.class)).getDatasourceName();
        log.info("dsName: {}", (Object)dsName);
        String compatibilityLevel = ((ApiDescriptor)DHPUtils.MAPPER.readValue(parser.get("apidescriptor"), ApiDescriptor.class)).getCompatibilityLevel();
        log.info("compatibilityLevel: {}", (Object)compatibilityLevel);
        MDStoreVersion mdstoreVersion = (MDStoreVersion)DHPUtils.MAPPER.readValue(mdstoreInputVersion, MDStoreVersion.class);
        String inputPath = mdstoreVersion.getHdfsPath() + "/store";
        log.info("inputPath: {}", (Object)inputPath);
        String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
        log.info("brokerApiBaseUrl: {}", (Object)brokerApiBaseUrl);
        if (StringUtils.isAnyBlank((CharSequence[])new CharSequence[]{dsId, compatibilityLevel, inputPath, brokerApiBaseUrl})) {
            throw new RuntimeException("A required information is missing");
        }
        ValidationType validationType = GenerateAlertNotificationsJob.calculateValidationType(compatibilityLevel);
        if (validationType == null) {
            log.warn("The compatibility is non managed by the validator engine");
            return;
        }
        String topic = TOPIC_PREFIX + StringUtils.upperCase((String)validationType.toString());
        log.info("topic: {}", (Object)topic);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            LongAccumulator total = spark.sparkContext().longAccumulator("total_alert_notifications");
            Dataset payloads = spark.read().parquet(inputPath).as(Encoders.bean(MetadataRecord.class)).filter((FilterFunction & Serializable)r -> r.getValidationResults().containsKey(validationType)).map((MapFunction & Serializable)r -> GenerateAlertNotificationsJob.generatePayload(r.getOriginalId(), dsId, dsName, (StandardValidationResult)r.getValidationResults().get(validationType)), Encoders.bean(ValidatorAlertMessage.class));
            DatasourceStats stats = new DatasourceStats();
            stats.setId(dsId);
            stats.setName(dsName);
            stats.setType("-");
            stats.setTopic(topic);
            stats.setSize(payloads.count());
            GenerateAlertNotificationsJob.updateStats(brokerApiBaseUrl, stats);
            List subscriptions = GenerateAlertNotificationsJob.listSubscriptions(brokerApiBaseUrl).stream().filter(s -> s.getTopic().equals(topic)).filter(s -> GenerateAlertNotificationsJob.extractDatasourceId(s).equalsIgnoreCase(dsId)).collect(Collectors.toList());
            Long date = new Date().getTime();
            log.info("date: {}", (Object)date);
            if (subscriptions.size() > 0) {
                Dataset dataset = payloads.flatMap((FlatMapFunction & Serializable)p -> GenerateAlertNotificationsJob.generateAlertNotifications(p, date, subscriptions), Encoders.bean(OaAlertNotification.class)).filter((FilterFunction & Serializable)n -> StringUtils.isNotBlank((CharSequence)n.getPayload()));
                ClusterUtils.save(dataset, outputPath, OaAlertNotification.class, total);
            } else {
                ClusterUtils.save(spark.emptyDataset(Encoders.bean(OaAlertNotification.class)), outputPath, OaAlertNotification.class, total);
            }
        });
    }

    private static ValidationType calculateValidationType(String compatibilityLevel) {
        if ("openaire2.0".equalsIgnoreCase(compatibilityLevel)) {
            return ValidationType.openaire2_0;
        }
        if ("openaire3.0".equalsIgnoreCase(compatibilityLevel)) {
            return ValidationType.openaire3_0;
        }
        if ("openaire4.0".equalsIgnoreCase(compatibilityLevel)) {
            return ValidationType.openaire4_0;
        }
        if ("fair_data".equalsIgnoreCase(compatibilityLevel)) {
            return ValidationType.fair_data;
        }
        if ("fair_literature_v4".equalsIgnoreCase(compatibilityLevel)) {
            return ValidationType.fair_literature_v4;
        }
        return null;
    }

    private static Iterator<OaAlertNotification> generateAlertNotifications(ValidatorAlertMessage alertMessage, Long date, List<Subscription> subscriptions) {
        OaAlertMappedFields fields = new OaAlertMappedFields();
        fields.setOriginalId(alertMessage.getOriginalId());
        fields.setDatasourceId(alertMessage.getDatasourceId());
        fields.setDatasourceName(alertMessage.getDatasourceName());
        String eventId = "evt-" + UUID.randomUUID();
        return subscriptions.stream().map(s -> {
            OaAlertNotification n = new OaAlertNotification();
            n.setNotificationId("ntf-" + DigestUtils.md5Hex((String)(s.getSubscriptionId() + "@@@" + eventId)));
            n.setEventId(eventId);
            n.setDate(date);
            n.setMap(fields);
            try {
                n.setPayload(DHPUtils.MAPPER.writeValueAsString((Object)alertMessage));
            }
            catch (JsonProcessingException e) {
                n.setPayload(null);
            }
            n.setProducerId("OPENAIRE");
            n.setSubscriptionId(s.getSubscriptionId());
            n.setTopic(s.getTopic());
            return n;
        }).iterator();
    }

    /*
     * Exception decompiling
     */
    private static List<Subscription> listSubscriptions(String brokerApiBaseUrl) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private static void updateStats(String brokerApiBaseUrl, DatasourceStats stats) throws IOException {
        HttpPost req = new HttpPost(brokerApiBaseUrl + "/api/openaire-alerts/stats/update");
        req.setHeader("Accept", "application/json");
        req.setHeader("Content-type", "application/json");
        req.setEntity((HttpEntity)new StringEntity(DHPUtils.MAPPER.writeValueAsString((Object)stats)));
        try (CloseableHttpClient client = HttpClients.createDefault();){
            CloseableHttpResponse response = client.execute((HttpUriRequest)req);
            Throwable throwable = null;
            if (response != null) {
                if (throwable != null) {
                    try {
                        response.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                } else {
                    response.close();
                }
            }
        }
    }

    private static ValidatorAlertMessage generatePayload(String originalId, String dsId, String dsName, StandardValidationResult standardValidationResult) {
        ValidatorAlertMessage res = new ValidatorAlertMessage();
        res.setOriginalId(originalId);
        res.setDatasourceId(dsId);
        res.setDatasourceName(dsName);
        res.setErrors(standardValidationResult.getResults().entrySet().stream().map(e -> {
            String field = (String)e.getKey();
            return ((StandardResult)e.getValue()).getErrors().stream().map(err -> new ValidatorErrorMessage(field, err)).collect(Collectors.toList());
        }).flatMap(Collection::stream).collect(Collectors.toList()));
        return res;
    }

    private static String extractDatasourceId(Subscription sub) {
        return sub.conditionsAsList().stream().filter(c -> "datasourceId".equals(c.getField())).map(MapCondition::getListParams).filter(l -> !l.isEmpty()).map(l -> ((ConditionParams)l.get(0)).getValue()).findFirst().orElse("");
    }
}

