/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata;

import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata.PublisherAuthors;
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
import eu.dnetlib.dhp.utils.OrcidAuthor;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class EnrichExternalDataWithGraphORCID
extends SparkEnrichWithOrcidAuthors {
    private static final Logger log = LoggerFactory.getLogger(EnrichExternalDataWithGraphORCID.class);
    public static final DataInfo DATAINFO = OafMapperUtils.dataInfo((Boolean)false, (String)"propagation", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"person:relations:publisher", (String)"Extraction of authors relations from publishers data enriched with graph orcid", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"0.85");

    public EnrichExternalDataWithGraphORCID(String propertyPath, String[] args, Logger log) {
        super(propertyPath, args, log);
    }

    public static void main(String[] args) throws Exception {
        EnrichExternalDataWithGraphORCID app = new EnrichExternalDataWithGraphORCID("/eu/dnetlib/dhp/wf/subworkflows/enrich/orcid/enrich_graph_orcid_parameters.json", args, log);
        app.initialize().run();
    }

    public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
        Dataset orcidDnet = spark.read().schema(Encoders.bean(Result.class).schema()).json(orcidPath + "/publication").as(Encoders.bean(Result.class)).filter((FilterFunction & Serializable)r -> r.getPid() != null && r.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("doi"))).filter((FilterFunction & Serializable)r -> r.getAuthor() != null && r.getAuthor().stream().anyMatch(a -> a.getPid() != null && a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid") || p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")))).flatMap((FlatMapFunction & Serializable)r -> {
            ArrayList t2 = new ArrayList();
            List<String> dois = r.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("doi")).map(p -> p.getValue()).collect(Collectors.toList());
            OrcidAuthors authors = EnrichExternalDataWithGraphORCID.getOrcidAuthorsList(r.getAuthor());
            dois.forEach(doi -> t2.add(new Tuple2(doi, (Object)authors)));
            return t2.iterator();
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(OrcidAuthors.class))).selectExpr(new String[]{"_1 as id", "_2.orcidAuthorList as orcid_authors"});
        orcidDnet.write().mode(SaveMode.Overwrite).option("compression", "gzip").parquet(targetPath + "/graph_authors");
        StructType schema = new StructType().add("DOI", DataTypes.StringType).add("Authors", (DataType)DataTypes.createArrayType((DataType)new StructType().add("Corresponding", DataTypes.StringType).add("Contributor_roles", (DataType)DataTypes.createArrayType((DataType)new StructType().add("Schema", DataTypes.StringType).add("Value", DataTypes.StringType))).add("Name", (DataType)new StructType().add("Full", DataTypes.StringType).add("First", DataTypes.StringType).add("Last", DataTypes.StringType)).add("Matchings", (DataType)DataTypes.createArrayType((DataType)new StructType().add("PID", DataTypes.StringType).add("Value", DataTypes.StringType).add("Confidence", DataTypes.DoubleType).add("Status", DataTypes.StringType))).add("PIDs", (DataType)DataTypes.createArrayType((DataType)new StructType().add("Schema", DataTypes.StringType).add("Value", DataTypes.StringType)))));
        Dataset df = spark.read().schema(schema).json(graphPath).where("DOI is not null");
        Dataset authors = df.selectExpr(new String[]{"DOI as doi", "explode(Authors) as author"}).selectExpr(new String[]{"doi", "author.Name.Full as fullname", "author.Name.First as firstname", "author.Name.Last as lastname", "author.PIDs as pids", "author.Matchings as affiliations"}).map((MapFunction & Serializable)a -> new Tuple2((Object)((String)a.getAs("doi")), (Object)EnrichExternalDataWithGraphORCID.getAuthor(a)), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(Author.class))).groupByKey((MapFunction & Serializable)t2 -> (String)t2._1(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            PublisherAuthors pa = new PublisherAuthors();
            while (it.hasNext()) {
                pa.getPublisherAuthorList().add((Author)((Tuple2)it.next())._2());
            }
            return new Tuple2(k, (Object)pa);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(PublisherAuthors.class))).selectExpr(new String[]{"_1 as id", "_2.publisherAuthorList as graph_authors"});
        orcidDnet.join(authors, "id").write().mode(SaveMode.Overwrite).option("compression", "gzip").parquet(targetPath + "/publication_unmatched");
    }

    public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) {
        Dataset<Relation> newRelations = EnrichExternalDataWithGraphORCID.getNewRelations(spark, workingDir);
        Dataset<Row> graph_relations = EnrichExternalDataWithGraphORCID.getMergesRelationships(spark, targetPath);
        Dataset<Relation> redirectedRels = EnrichExternalDataWithGraphORCID.redirectNewRelationsOnRepresentatives(newRelations, graph_relations);
        Dataset matched = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()).parquet(workingDir + "/publication_matched").selectExpr(new String[]{"id", "enriched_author"});
        Dataset graph = spark.read().parquet(workingDir + "/graph_authors");
        Dataset coAuthorshipRels = graph.joinWith(matched, graph.col("id").equalTo((Object)matched.col("id"))).flatMap(EnrichExternalDataWithGraphORCID::coAuthorshipRels, Encoders.bean(Relation.class));
        EnrichExternalDataWithGraphORCID.mergeOldAndNewRelations(spark, targetPath, (Dataset<Relation>)redirectedRels.union(coAuthorshipRels)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "/relation");
        spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingDir + "/relation").write().option("compression", "gzip").mode(SaveMode.Overwrite).json(targetPath + "/relation");
    }

    private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
        OrcidAuthors oas = new OrcidAuthors();
        List<OrcidAuthor> tmp = authors.stream().map(EnrichExternalDataWithGraphORCID::getOrcidAuthor).filter(Objects::nonNull).collect(Collectors.toList());
        oas.setOrcidAuthorList(tmp);
        return oas;
    }

    private static OrcidAuthor getOrcidAuthor(Author a) {
        return Optional.ofNullable(EnrichExternalDataWithGraphORCID.getOrcid(a)).map(orcid -> new OrcidAuthor(orcid, a.getSurname(), a.getName(), a.getFullname(), null)).orElse(null);
    }

    private static String getOrcid(Row a) {
        List authorPids = a.getList(a.fieldIndex("pid"));
        return authorPids.stream().filter(p -> {
            Row qualifier = (Row)p.getAs("qualifier");
            return qualifier.getAs("classid").equals("orcid");
        }).findFirst().map(p -> (String)p.getAs("value")).orElse(authorPids.stream().filter(p -> {
            Row qualifier = (Row)p.getAs("qualifier");
            return qualifier.getAs("classid").equals("orcid_pending");
        }).findFirst().map(p -> (String)p.getAs("value")).orElse(null));
    }

    private static String getOrcid(Author a) {
        if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid"))) {
            return a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).findFirst().get().getValue();
        }
        if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))) {
            return a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).findFirst().get().getValue();
        }
        return null;
    }

    private static Iterator<Relation> coAuthorshipRels(Tuple2<Row, Row> t2) {
        List authorsList1 = ((Row)t2._1()).getList(((Row)t2._1()).fieldIndex("orcid_authors")).stream().map(a -> (String)a.getAs("orcid")).collect(Collectors.toList());
        List authorsList2 = ((Row)t2._2()).getList(((Row)t2._2()).fieldIndex("enriched_author")).stream().map(a -> EnrichExternalDataWithGraphORCID.getOrcid(a)).filter(Objects::nonNull).collect(Collectors.toList());
        authorsList1.addAll(authorsList2);
        ArrayList relList = new ArrayList();
        new CoAuthorshipIterator(authorsList1).forEachRemaining(r -> relList.add(r));
        return relList.iterator();
    }

    private static Dataset<Relation> mergeOldAndNewRelations(SparkSession spark, String relationPath, Dataset<Relation> redirectedRelations) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(relationPath + "/relation").as(Encoders.bean(Relation.class)).union(redirectedRelations).groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getRelClass() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            Relation[] ret = new Relation[]{(Relation)it.next()};
            it.forEachRemaining(r -> {
                ret[0] = MergeUtils.mergeRelation((Relation)ret[0], (Relation)r);
            });
            return ret[0];
        }, Encoders.bean(Relation.class));
    }

    private static Dataset<Relation> redirectNewRelationsOnRepresentatives(Dataset<Relation> newRelations, Dataset<Row> graph_relations) {
        return newRelations.joinWith(graph_relations, newRelations.col("target").equalTo((Object)graph_relations.col("target")), "left").map((MapFunction & Serializable)t2 -> {
            if (t2._2() != null) {
                ((Relation)t2._1()).setTarget((String)((Row)t2._2()).getAs("target"));
            }
            return (Relation)t2._1();
        }, Encoders.bean(Relation.class));
    }

    private static Dataset<Row> getMergesRelationships(SparkSession spark, String targetPath) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(targetPath + "/relation").filter("relClass = 'merges'").select("source", new String[]{"target"});
    }

    private static Dataset<Relation> getNewRelations(SparkSession spark, String workingDir) {
        return spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()).parquet(workingDir + "/publication_matched").selectExpr(new String[]{"id as doi", "enriched_author"}).flatMap(EnrichExternalDataWithGraphORCID::getRelationsList, Encoders.bean(Relation.class));
    }

    private static Iterator<Relation> getRelationsList(Row r) {
        ArrayList relationList = new ArrayList();
        List eauthors = r.getList(r.fieldIndex("enriched_author"));
        eauthors.forEach(author -> {
            List pids = author.getList(author.fieldIndex("pid"));
            List<Row> pidList = pids.stream().filter(p -> {
                Row qualifier = (Row)p.getAs("qualifier");
                Row dataInfo = (Row)p.getAs("dataInfo");
                return "orcid".equalsIgnoreCase((String)qualifier.getAs("classid")) || "orcid_pending".equalsIgnoreCase((String)qualifier.getAs("classid"));
            }).collect(Collectors.toList());
            pidList.forEach(p -> relationList.add(EnrichExternalDataWithGraphORCID.getRelations((String)r.getAs("doi"), author.getList(author.fieldIndex("rawAffiliationString")), (String)p.getAs("value"))));
        });
        return relationList.iterator();
    }

    private static Relation getRelations(String doi, List<String> rawAffiliationString, String orcid) {
        Relation rel = OafMapperUtils.getRelation((String)("30|orcid_______::" + DHPUtils.md5((String)orcid)), (String)("50|doi_________::" + DHPUtils.md5((String)doi)), (String)"resultPerson", (String)"authorship", (String)"hasAuthored", null, (DataInfo)DATAINFO, null);
        rawAffiliationString.forEach(raf -> {
            String[] affiliationInfo = raf.split("@@");
            KeyValue kv = new KeyValue();
            if (affiliationInfo[0].equalsIgnoreCase("ror")) {
                kv.setKey("declared_affiliation");
                kv.setValue(affiliationInfo[1]);
                kv.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, (String)"openaire:inference", (Boolean)true, (Boolean)false, null, (String)affiliationInfo[2]));
            }
            if (!StringUtils.isEmpty((CharSequence)kv.getKey())) {
                if (!Optional.ofNullable(rel.getProperties()).isPresent()) {
                    rel.setProperties(new ArrayList());
                }
                rel.getProperties().add(kv);
            }
        });
        return rel;
    }

    @NotNull
    private static Author getAuthor(Row a) {
        Author author = new Author();
        author.setName((String)a.getAs("firstname"));
        author.setFullname((String)a.getAs("fullname"));
        author.setSurname((String)a.getAs("lastname"));
        ArrayList pids = new ArrayList();
        ArrayList affs = new ArrayList();
        List publisherPids = new ArrayList();
        if (Optional.ofNullable(a.getAs("pids")).isPresent()) {
            publisherPids = a.getList(a.fieldIndex("pids"));
        }
        publisherPids.forEach(pid -> pids.add(EnrichExternalDataWithGraphORCID.getPid(pid)));
        List affiliations = a.getList(a.fieldIndex("affiliations"));
        affiliations.forEach(aff -> {
            String pidtype = (String)aff.getAs("PID");
            String pidvalue = (String)aff.getAs("Value");
            Double pidconfidence = (Double)aff.getAs("Confidence");
            if (aff.getAs("Status").equals("active")) {
                affs.add(pidtype + "@@" + pidvalue + "@@" + pidconfidence);
            }
        });
        author.setPid(pids);
        author.setRawAffiliationString(affs);
        return author;
    }

    @Nullable
    private static StructuredProperty getPid(Row pid) {
        return OafMapperUtils.structuredProperty((String)((String)pid.getAs("Value")), (Qualifier)OafMapperUtils.qualifier((String)((String)pid.getAs("Schema")), (String)((String)pid.getAs("Schema")), (String)"dnet:pid_types", (String)"dnet:pid_types"), null);
    }
}

