/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.google.common.hash.Hashing;
import com.kwartile.lib.cc.ConnectedComponent;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.IdGenerator;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.SparkCompatUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.dom4j.DocumentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import scala.Tuple3;

public class SparkCreateMergeRels
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);

    public SparkCreateMergeRels(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        parser.parseArgument(args);
        String isLookUpUrl = parser.get("isLookUpUrl");
        log.info("isLookupUrl {}", (Object)isLookUpUrl);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkCreateMergeRels(parser, SparkCreateMergeRels.getSparkWithHiveSession(conf)).run(ISLookupClientFactory.getLookUpService((String)isLookUpUrl));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws ISLookUpException, DocumentException, IOException, SAXException {
        String graphBasePath = this.parser.get("graphBasePath");
        String workingPath = this.parser.get("workingPath");
        String isLookUpUrl = this.parser.get("isLookUpUrl");
        String actionSetId = this.parser.get("actionSetId");
        int cut = Optional.ofNullable(this.parser.get("cutConnectedComponent")).map(Integer::valueOf).orElse(0);
        String pivotHistoryDatabase = this.parser.get("pivotHistoryDatabase");
        log.info("connected component cut: '{}'", (Object)cut);
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("isLookUpUrl:   '{}'", (Object)isLookUpUrl);
        log.info("actionSetId:   '{}'", (Object)actionSetId);
        log.info("workingPath:   '{}'", (Object)workingPath);
        for (DedupConfig dedupConf : this.getConfigurations(isLookUpService, actionSetId)) {
            String subEntity = dedupConf.getWf().getSubEntityValue();
            Class clazz = (Class)ModelSupport.entityTypes.get(EntityType.valueOf((String)subEntity));
            log.info("Creating merge rels for: '{}'", (Object)subEntity);
            int maxIterations = dedupConf.getWf().getMaxIterations();
            log.info("Max iterations {}", (Object)maxIterations);
            String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, subEntity);
            org.apache.spark.sql.Dataset simRels = this.spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).select("source", new String[]{"target"});
            UserDefinedFunction hashUDF = functions.udf((UDF1 & Serializable)s -> SparkCreateMergeRels.hash(s), (DataType)DataTypes.LongType);
            org.apache.spark.sql.Dataset vertexIdMap = simRels.selectExpr(new String[]{"source as id"}).union(simRels.selectExpr(new String[]{"target as id"})).distinct().withColumn("vertexId", hashUDF.apply(new Column[]{functions.col((String)"id")}));
            org.apache.spark.sql.Dataset edges = this.spark.read().load(DedupUtility.createSimRelPath(workingPath, actionSetId, subEntity)).select("source", new String[]{"target"}).withColumn("source", hashUDF.apply(new Column[]{functions.col((String)"source")})).withColumn("target", hashUDF.apply(new Column[]{functions.col((String)"target")}));
            org.apache.spark.sql.Dataset<Row> cliques = ConnectedComponent.runOnPairs((org.apache.spark.sql.Dataset<Row>)edges, 50, this.spark);
            org.apache.spark.sql.Dataset rawMergeRels = cliques.join(vertexIdMap, DHPUtils.toSeq(Collections.singletonList("vertexId")).toSeq(), "inner").drop("vertexId").distinct();
            org.apache.spark.sql.Dataset pivotHistory = this.spark.createDataset(Collections.emptyList(), (Encoder)SparkCompatUtils.encoderFor((StructType)StructType.fromDDL((String)"id STRING, lastUsage STRING")));
            if (StringUtils.isNotBlank((CharSequence)pivotHistoryDatabase)) {
                pivotHistory = this.spark.read().table(pivotHistoryDatabase + "." + subEntity).selectExpr(new String[]{"id", "lastUsage"});
            }
            String collectedfromExpr = "false AS collectedfrom";
            String dateExpr = "'' AS date";
            if (Result.class.isAssignableFrom(clazz)) {
                if (Publication.class.isAssignableFrom(clazz)) {
                    collectedfromExpr = "array_contains(collectedfrom.key, '10|openaire____::081b82f96300b6a6e3d282bad31cb6e2') AS collectedfrom";
                } else if (Dataset.class.isAssignableFrom(clazz)) {
                    collectedfromExpr = "array_contains(collectedfrom.key, '10|openaire____::9e3be59865b2c1c335d32dae2fe7b254') AS collectedfrom";
                }
                dateExpr = "dateofacceptance.value AS date";
            }
            UserDefinedFunction mapPid = functions.udf((UDF1 & Serializable)s -> Math.min(PidType.tryValueOf((String)s).ordinal(), PidType.w3id.ordinal()), (DataType)DataTypes.IntegerType);
            UserDefinedFunction validDate = functions.udf((UDF1 & Serializable)date -> {
                if (StringUtils.isNotBlank((CharSequence)date) && date.matches("^(\\d{4})-(\\d{2})-(\\d{2})") && DatePicker.inRange(date)) {
                    return date;
                }
                return LocalDate.now().plusWeeks(1L).toString();
            }, (DataType)DataTypes.StringType);
            org.apache.spark.sql.Dataset pivotingData = this.spark.read().schema(Encoders.bean((Class)clazz).schema()).json(DedupUtility.createEntityPath(graphBasePath, subEntity)).selectExpr(new String[]{"id", "regexp_extract(id, '^\\\\d+\\\\|([^_]+).*::', 1) AS pidType", collectedfromExpr, dateExpr}).withColumn("pidType", mapPid.apply(new Column[]{functions.col((String)"pidType")})).withColumn("date", validDate.apply(new Column[]{functions.col((String)"date")}));
            WindowSpec w = Window.partitionBy((String)"groupId", (String[])new String[0]).orderBy(new Column[]{functions.col((String)"pidType").asc_nulls_last(), functions.col((String)"lastUsage").desc_nulls_last(), functions.col((String)"collectedfrom").desc_nulls_last(), functions.col((String)"date").asc_nulls_last(), functions.col((String)"id").asc_nulls_last()});
            org.apache.spark.sql.Dataset output = rawMergeRels.join(pivotHistory, DHPUtils.toSeq(Collections.singletonList("id")).toSeq(), "full").join(pivotingData, DHPUtils.toSeq(Collections.singletonList("id")).toSeq(), "left").withColumn("pivot", functions.first((String)"id").over(w)).withColumn("position", functions.row_number().over(w)).flatMap((FlatMapFunction & Serializable)r -> {
                String id = (String)r.getAs("id");
                String dedupId = IdGenerator.generate(id);
                String pivot = (String)r.getAs("pivot");
                String pivotDedupId = IdGenerator.generate(pivot);
                if (id.equals(pivotDedupId)) {
                    return Collections.emptyIterator();
                }
                ArrayList<Tuple3> res = new ArrayList<Tuple3>();
                if (r.isNullAt(r.fieldIndex("groupId"))) {
                    if (!r.isNullAt(r.fieldIndex("collectedfrom"))) {
                        res.add(new Tuple3((Object)id, (Object)dedupId, null));
                    }
                    return res.iterator();
                }
                if (!(r.isNullAt(r.fieldIndex("lastUsage")) || pivot.equals(id) || dedupId.equals(pivotDedupId))) {
                    res.add(new Tuple3((Object)dedupId, (Object)pivotDedupId, null));
                }
                if (cut <= 0 || (Integer)r.getAs("position") <= cut) {
                    res.add(new Tuple3((Object)id, (Object)pivotDedupId, (Object)pivot));
                }
                return res.iterator();
            }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).distinct().flatMap((FlatMapFunction & Serializable)r -> {
                String id = (String)r._1();
                String dedupId = (String)r._2();
                String pivot = (String)r._3();
                ArrayList<Relation> res = new ArrayList<Relation>();
                res.add(SparkCreateMergeRels.rel(pivot, dedupId, id, "merges", dedupConf));
                res.add(SparkCreateMergeRels.rel(pivot, id, dedupId, "isMergedIn", dedupConf));
                return res.iterator();
            }, Encoders.bean(Relation.class));
            SparkCreateMergeRels.saveParquet(output, mergeRelPath, SaveMode.Overwrite);
        }
    }

    private static Relation rel(String pivot, String source, String target, String relClass, DedupConfig dedupConf) {
        String entityType = dedupConf.getWf().getEntityType();
        Relation r = new Relation();
        r.setSource(source);
        r.setTarget(target);
        r.setRelClass(relClass);
        r.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
        r.setSubRelType("dedup");
        DataInfo info = new DataInfo();
        info.setDeletedbyinference(Boolean.valueOf(false));
        info.setInferred(Boolean.valueOf(true));
        info.setInvisible(Boolean.valueOf(false));
        info.setInferenceprovenance(dedupConf.getWf().getConfigurationId());
        Qualifier provenanceAction = new Qualifier();
        provenanceAction.setClassid("sysimport:dedup");
        provenanceAction.setClassname("sysimport:dedup");
        provenanceAction.setSchemeid("dnet:provenanceActions");
        provenanceAction.setSchemename("dnet:provenanceActions");
        info.setProvenanceaction(provenanceAction);
        r.setDataInfo(info);
        if (pivot != null) {
            KeyValue pivotKV = new KeyValue();
            pivotKV.setKey("pivot");
            pivotKV.setValue(pivot);
            r.setProperties(Arrays.asList(pivotKV));
        }
        return r;
    }

    public static long hash(String id) {
        return Hashing.murmur3_128().hashString((CharSequence)id, StandardCharsets.UTF_8).asLong();
    }
}

