/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.common.author.SparkEnrichWithOrcidAuthors;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.common.person.Constants;
import eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata.PublisherAuthors;
import eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata.SerializationBean;
import eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata.SerializationOrg;
import eu.dnetlib.dhp.enrich.relsfrompublisherenricheddata.SerializationRoles;
import eu.dnetlib.dhp.orcidtoresultfromsemrel.OrcidAuthors;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ORCIDAuthorEnricherResult;
import eu.dnetlib.dhp.utils.OrcidAuthor;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class EnrichExternalDataWithGraphORCID
extends SparkEnrichWithOrcidAuthors {
    private static final Logger log = LoggerFactory.getLogger(EnrichExternalDataWithGraphORCID.class);
    public static final DataInfo DATAINFO = OafMapperUtils.dataInfo((Boolean)false, (String)"propagation", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"person:relations:publisher", (String)"Extraction of authors relations from publishers data enriched with graph orcid", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"0.85");

    public EnrichExternalDataWithGraphORCID(String propertyPath, String[] args, Logger log) {
        super(propertyPath, args, log);
    }

    public static void main(String[] args) throws Exception {
        EnrichExternalDataWithGraphORCID app = new EnrichExternalDataWithGraphORCID("/eu/dnetlib/dhp/wf/subworkflows/enrich/orcid/enrich_graph_orcid_parameters.json", args, log);
        app.initialize().run();
    }

    public void createTemporaryData(SparkSession spark, String graphPath, String orcidPath, String targetPath) {
        Dataset orcidDnet = spark.read().schema(Encoders.bean(Result.class).schema()).json(orcidPath + "/publication").as(Encoders.bean(Result.class)).filter((FilterFunction & Serializable)r -> r.getPid() != null && r.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("doi"))).filter((FilterFunction & Serializable)r -> r.getAuthor() != null && r.getAuthor().stream().anyMatch(a -> a.getPid() != null && a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid") || p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")))).flatMap((FlatMapFunction & Serializable)r -> {
            ArrayList t2 = new ArrayList();
            List<String> dois = r.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("doi")).map(p -> p.getValue()).collect(Collectors.toList());
            OrcidAuthors authors = EnrichExternalDataWithGraphORCID.getOrcidAuthorsList(r.getAuthor());
            dois.forEach(doi -> t2.add(new Tuple2(doi, (Object)authors)));
            return t2.iterator();
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(OrcidAuthors.class))).selectExpr(new String[]{"_1 as id", "_2.orcidAuthorList as orcid_authors"});
        orcidDnet.write().mode(SaveMode.Overwrite).option("compression", "gzip").parquet(targetPath + "/graph_authors");
        Dataset df = spark.read().schema(Constants.PUBLISHER_INPUT_SCHEMA).json(graphPath).where("doi is not null");
        Dataset authors = df.selectExpr(new String[]{"doi", "explode(authors) as author"}).selectExpr(new String[]{"doi", "author.name.full as fullname", "author.name.first as firstname", "author.name.last as lastname", "author.pids as pids", "author.matchings as affiliations", "author.corresponding as corresponding", "author.contributor_roles as roles"}).map((MapFunction & Serializable)a -> new Tuple2((Object)((String)a.getAs("doi")), (Object)EnrichExternalDataWithGraphORCID.getAuthor(a)), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(Author.class))).groupByKey((MapFunction & Serializable)t2 -> (String)t2._1(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            PublisherAuthors pa = new PublisherAuthors();
            while (it.hasNext()) {
                pa.getPublisherAuthorList().add((Author)((Tuple2)it.next())._2());
            }
            return new Tuple2(k, (Object)pa);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(PublisherAuthors.class))).selectExpr(new String[]{"_1 as id", "_2.publisherAuthorList as graph_authors"});
        authors.show(false);
        orcidDnet.join(authors, "id").write().mode(SaveMode.Overwrite).option("compression", "gzip").parquet(targetPath + "/publication_unmatched");
    }

    public void generateGraph(SparkSession spark, String graphPath, String workingDir, String targetPath) {
        Dataset<Relation> newRelations = EnrichExternalDataWithGraphORCID.getNewRelations(spark, workingDir);
        newRelations.show(false);
        Dataset<Row> graph_relations = EnrichExternalDataWithGraphORCID.getMergesRelationships(spark, targetPath);
        Dataset<Relation> redirectedRels = EnrichExternalDataWithGraphORCID.redirectNewRelationsOnRepresentatives(newRelations, graph_relations);
        Dataset matched = spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()).parquet(workingDir + "/publication_matched").selectExpr(new String[]{"id", "enriched_author"});
        Dataset graph = spark.read().parquet(workingDir + "/graph_authors");
        Dataset coAuthorshipRels = graph.joinWith(matched, graph.col("id").equalTo((Object)matched.col("id"))).flatMap(EnrichExternalDataWithGraphORCID::coAuthorshipRels, Encoders.bean(Relation.class));
        EnrichExternalDataWithGraphORCID.mergeOldAndNewRelations(spark, targetPath, (Dataset<Relation>)redirectedRels.union(coAuthorshipRels)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingDir + "/relation");
        spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingDir + "/relation").write().option("compression", "gzip").mode(SaveMode.Overwrite).json(targetPath + "/relation");
    }

    private static OrcidAuthors getOrcidAuthorsList(List<Author> authors) {
        OrcidAuthors oas = new OrcidAuthors();
        List<OrcidAuthor> tmp = authors.stream().map(EnrichExternalDataWithGraphORCID::getOrcidAuthor).filter(Objects::nonNull).collect(Collectors.toList());
        oas.setOrcidAuthorList(tmp);
        return oas;
    }

    private static OrcidAuthor getOrcidAuthor(Author a) {
        return Optional.ofNullable(EnrichExternalDataWithGraphORCID.getOrcid(a)).map(orcid -> new OrcidAuthor(orcid, a.getSurname(), a.getName(), a.getFullname(), null)).orElse(null);
    }

    private static String getOrcid(Row a) {
        List authorPids = a.getList(a.fieldIndex("pid"));
        return authorPids.stream().filter(p -> {
            Row qualifier = (Row)p.getAs("qualifier");
            return qualifier.getAs("classid").equals("orcid");
        }).findFirst().map(p -> (String)p.getAs("value")).orElse(authorPids.stream().filter(p -> {
            Row qualifier = (Row)p.getAs("qualifier");
            return qualifier.getAs("classid").equals("orcid_pending");
        }).findFirst().map(p -> (String)p.getAs("value")).orElse(null));
    }

    private static String getOrcid(Author a) {
        if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid"))) {
            return a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).findFirst().get().getValue();
        }
        if (a.getPid().stream().anyMatch(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending"))) {
            return a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).findFirst().get().getValue();
        }
        return null;
    }

    private static Iterator<Relation> coAuthorshipRels(Tuple2<Row, Row> t2) {
        List authorsList1 = ((Row)t2._1()).getList(((Row)t2._1()).fieldIndex("orcid_authors")).stream().map(a -> (String)a.getAs("orcid")).collect(Collectors.toList());
        List authorsList2 = ((Row)t2._2()).getList(((Row)t2._2()).fieldIndex("enriched_author")).stream().map(a -> EnrichExternalDataWithGraphORCID.getOrcid(a)).filter(Objects::nonNull).collect(Collectors.toList());
        authorsList1.addAll(authorsList2);
        ArrayList relList = new ArrayList();
        new CoAuthorshipIterator(authorsList1).forEachRemaining(r -> relList.add(r));
        return relList.iterator();
    }

    private static Dataset<Relation> mergeOldAndNewRelations(SparkSession spark, String relationPath, Dataset<Relation> redirectedRelations) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(relationPath + "/relation").as(Encoders.bean(Relation.class)).union(redirectedRelations).groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getRelClass() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            Relation[] ret = new Relation[]{(Relation)it.next()};
            it.forEachRemaining(r -> {
                ret[0] = MergeUtils.mergeRelation((Relation)ret[0], (Relation)r);
            });
            return ret[0];
        }, Encoders.bean(Relation.class));
    }

    private static Dataset<Relation> redirectNewRelationsOnRepresentatives(Dataset<Relation> newRelations, Dataset<Row> graph_relations) {
        return newRelations.joinWith(graph_relations, newRelations.col("target").equalTo((Object)graph_relations.col("target")), "left").map((MapFunction & Serializable)t2 -> {
            if (t2._2() != null) {
                ((Relation)t2._1()).setTarget((String)((Row)t2._2()).getAs("target"));
            }
            return (Relation)t2._1();
        }, Encoders.bean(Relation.class));
    }

    private static Dataset<Row> getMergesRelationships(SparkSession spark, String targetPath) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(targetPath + "/relation").filter("relClass = 'merges'").select("source", new String[]{"target"});
    }

    private static Dataset<Relation> getNewRelations(SparkSession spark, String workingDir) {
        return spark.read().schema(Encoders.bean(ORCIDAuthorEnricherResult.class).schema()).parquet(workingDir + "/publication_matched").selectExpr(new String[]{"id as doi", "enriched_author"}).flatMap(EnrichExternalDataWithGraphORCID::getRelationsList, Encoders.bean(Relation.class));
    }

    private static Iterator<Relation> getRelationsList(Row r) {
        ArrayList relationList = new ArrayList();
        List eauthors = r.getList(r.fieldIndex("enriched_author"));
        eauthors.forEach(author -> {
            List pids = author.getList(author.fieldIndex("pid"));
            List<Row> pidList = pids.stream().filter(p -> {
                Row qualifier = (Row)p.getAs("qualifier");
                return "orcid".equalsIgnoreCase((String)qualifier.getAs("classid")) || "orcid_pending".equalsIgnoreCase((String)qualifier.getAs("classid"));
            }).collect(Collectors.toList());
            pidList.forEach(p -> relationList.add(EnrichExternalDataWithGraphORCID.getRelations(Constants.removePrefixUrl((String)((String)r.getAs("doi"))), author.getList(author.fieldIndex("rawAffiliationString")), Constants.removePrefixUrl((String)((String)p.getAs("value"))))));
        });
        return relationList.iterator();
    }

    private static Relation getRelations(String doi, List<String> rawAffiliationString, String orcid) {
        Relation rel = OafMapperUtils.getRelation((String)(Constants.PERSON_PREFIX + "::" + DHPUtils.md5((String)orcid)), (String)("50|doi_________::" + DHPUtils.md5((String)doi)), (String)"resultPerson", (String)"authorship", (String)"hasAuthored", null, (DataInfo)DATAINFO, null);
        rawAffiliationString.forEach(raf -> {
            try {
                SerializationBean sb = (SerializationBean)new ObjectMapper().readValue(raf, SerializationBean.class);
                ArrayList<KeyValue> keyValueList = new ArrayList<KeyValue>();
                if (Optional.ofNullable(sb.getCorresponding()).isPresent()) {
                    KeyValue kv = new KeyValue();
                    kv.setKey("corresponding");
                    kv.setValue(String.valueOf(sb.getCorresponding()));
                    keyValueList.add(kv);
                }
                if (!sb.getAffs().isEmpty()) {
                    sb.getAffs().forEach(a -> {
                        KeyValue kv = new KeyValue();
                        kv.setKey("declared_affiliation");
                        if (Optional.ofNullable(a.getRor()).isPresent()) {
                            kv.setValue(a.getRor());
                        } else {
                            kv.setValue("OpenOrgs: " + a.getOpenOrgs());
                        }
                        kv.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, (String)"openaire:inference", (Boolean)true, (Boolean)false, null, (String)String.valueOf(a.getConfidence())));
                        keyValueList.add(kv);
                    });
                }
                if (Optional.ofNullable(sb.getRoles()).isPresent()) {
                    sb.getRoles().stream().forEach(r -> {
                        KeyValue kv = new KeyValue();
                        if (Optional.ofNullable(r.getRoleSchema()).isPresent() && Optional.ofNullable(r.getRoleValue()).isPresent()) {
                            kv.setKey("role");
                            kv.setValue(r.getRoleSchema() + " " + r.getRoleValue());
                        } else {
                            kv.setKey("role");
                            kv.setValue(r.getRoleName());
                        }
                        keyValueList.add(kv);
                    });
                }
                if (keyValueList.size() > 0) {
                    if (!Optional.ofNullable(rel.getProperties()).isPresent()) {
                        rel.setProperties(new ArrayList());
                    }
                    rel.getProperties().addAll(keyValueList);
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        return rel;
    }

    @NotNull
    private static Author getAuthor(Row a) throws JsonProcessingException {
        Author author = new Author();
        author.setName((String)a.getAs("firstname"));
        author.setFullname((String)a.getAs("fullname"));
        author.setSurname((String)a.getAs("lastname"));
        ArrayList pids = new ArrayList();
        ArrayList serializationBean = new ArrayList();
        List publisherPids = new ArrayList();
        if (Optional.ofNullable(a.getAs("pids")).isPresent()) {
            publisherPids = a.getList(a.fieldIndex("pids"));
        }
        publisherPids.forEach(pid -> pids.add(EnrichExternalDataWithGraphORCID.getPid(pid)));
        List affiliations = a.getList(a.fieldIndex("affiliations"));
        SerializationBean sb = new SerializationBean();
        sb.setAffs(affiliations.stream().map(aff -> {
            if (aff.getAs("Status").equals("active")) {
                SerializationOrg so = new SerializationOrg();
                if ("ror".equalsIgnoreCase((String)aff.getAs("PID"))) {
                    so.setRor((String)aff.getAs("Value"));
                } else {
                    so.setOpenOrgs((String)aff.getAs("Value"));
                }
                so.setConfidence((Double)aff.getAs("Confidence"));
                return so;
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toList()));
        List roles = a.getList(a.fieldIndex("roles"));
        if (Optional.ofNullable(roles).isPresent()) {
            sb.setRoles(roles.stream().map(r -> {
                SerializationRoles sr = null;
                if (Optional.ofNullable(r.getAs("schema")).isPresent()) {
                    sr = new SerializationRoles();
                    sr.setRoleSchema((String)r.getAs("schema"));
                }
                if (Optional.ofNullable(r.getAs("value")).isPresent()) {
                    if (sr == null) {
                        sr = new SerializationRoles();
                    }
                    sr.setRoleValue((String)r.getAs("value"));
                }
                if (Optional.ofNullable(r.getAs("name")).isPresent()) {
                    if (sr == null) {
                        sr = new SerializationRoles();
                    }
                    sr.setRoleName((String)r.getAs("name"));
                }
                return sr;
            }).filter(Objects::nonNull).collect(Collectors.toList()));
        }
        if (Optional.ofNullable(a.getAs("corresponding")).isPresent()) {
            sb.setCorresponding(Boolean.valueOf((String)a.getAs("corresponding")));
        }
        author.setPid(pids);
        author.setRawAffiliationString(Arrays.asList(new ObjectMapper().writeValueAsString((Object)sb)));
        return author;
    }

    @Nullable
    private static StructuredProperty getPid(Row pid) {
        return OafMapperUtils.structuredProperty((String)Constants.removePrefixUrl((String)((String)pid.getAs("value"))), (Qualifier)OafMapperUtils.qualifier((String)((String)pid.getAs("schema")), (String)((String)pid.getAs("schema")), (String)"dnet:pid_types", (String)"dnet:pid_types"), null);
    }
}

