/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import com.esotericsoftware.minlog.Log;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.orcid.AuthorData;
import eu.dnetlib.dhp.schema.orcid.OrcidDOI;
import eu.dnetlib.doiboost.orcid.model.WorkData;
import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser;
import eu.dnetlib.doiboost.orcidnodoi.json.JsonWriter;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkGenerateDoiAuthorList {
    public static void main(String[] args) throws Exception {
        Logger logger = LoggerFactory.getLogger(SparkGenerateDoiAuthorList.class);
        logger.info("[ SparkGenerateDoiAuthorList STARTED]");
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkGenerateDoiAuthorList.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/gen_doi_author_list_orcid_parameters.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String workingPath = parser.get("workingPath");
        logger.info("workingPath: ", (Object)workingPath);
        String outputDoiAuthorListPath = parser.get("outputDoiAuthorListPath");
        logger.info("outputDoiAuthorListPath: ", (Object)outputDoiAuthorListPath);
        String authorsPath = parser.get("authorsPath");
        logger.info("authorsPath: ", (Object)authorsPath);
        String xmlWorksPath = parser.get("xmlWorksPath");
        logger.info("xmlWorksPath: ", (Object)xmlWorksPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            JavaPairRDD summariesRDD = sc.sequenceFile(workingPath.concat(authorsPath), Text.class, Text.class);
            Dataset summariesDataset = spark.createDataset(summariesRDD.map((Function & Serializable)seq -> SparkGenerateDoiAuthorList.loadAuthorFromJson((Text)seq._1(), (Text)seq._2())).rdd(), Encoders.bean(AuthorData.class));
            JavaPairRDD xmlWorksRDD = sc.sequenceFile(workingPath.concat(xmlWorksPath), Text.class, Text.class);
            Dataset activitiesDataset = spark.createDataset(xmlWorksRDD.map((Function & Serializable)seq -> XMLRecordParser.VTDParseWorkData(((Text)seq._2()).toString().getBytes())).filter((Function & Serializable)work -> work != null && work.getErrorCode() == null && work.isDoiFound()).rdd(), Encoders.bean(WorkData.class));
            Function & Serializable toAuthorListFunction = (Function & Serializable)data -> {
                try {
                    String doi = (String)data._1();
                    if (doi == null) {
                        return null;
                    }
                    AuthorData author = (AuthorData)data._2();
                    if (author == null) {
                        return null;
                    }
                    List<AuthorData> toAuthorList = Arrays.asList(author);
                    return new Tuple2((Object)doi, toAuthorList);
                }
                catch (Exception e) {
                    Log.error((String)"toAuthorListFunction ERROR", (Throwable)e);
                    return null;
                }
            };
            JavaRDD doisRDD = activitiesDataset.joinWith(summariesDataset, activitiesDataset.col("oid").equalTo((Object)summariesDataset.col("oid")), "inner").map((MapFunction & Serializable)value -> {
                WorkData w = (WorkData)value._1;
                AuthorData a = (AuthorData)value._2;
                return new Tuple2((Object)w.getDoi(), (Object)a);
            }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(AuthorData.class))).filter(Objects::nonNull).toJavaRDD().map((Function)toAuthorListFunction);
            JavaPairRDD.fromJavaRDD((JavaRDD)doisRDD).reduceByKey((Function2 & Serializable)(d1, d2) -> {
                try {
                    if (d1 != null && d2 != null) {
                        Stream mergedStream = Stream.concat(d1.stream(), d2.stream());
                        List mergedAuthors = mergedStream.collect(Collectors.toList());
                        return mergedAuthors;
                    }
                    if (d1 != null) {
                        return d1;
                    }
                    if (d2 != null) {
                        return d2;
                    }
                }
                catch (Exception e) {
                    Log.error((String)"mergeAuthorsFunction ERROR", (Throwable)e);
                    return null;
                }
                return null;
            }).mapToPair((PairFunction & Serializable)s -> {
                List authorList = (List)s._2();
                HashSet oidsAlreadySeen = new HashSet();
                authorList.removeIf(a -> !oidsAlreadySeen.add(a.getOid()));
                return new Tuple2(s._1(), (Object)authorList);
            }).map((Function & Serializable)s -> {
                OrcidDOI orcidDOI = new OrcidDOI();
                orcidDOI.setDoi((String)s._1());
                orcidDOI.setAuthors((List)s._2());
                return JsonWriter.create(orcidDOI);
            }).saveAsTextFile(workingPath + outputDoiAuthorListPath, GzipCodec.class);
        });
    }

    private static AuthorData loadAuthorFromJson(Text orcidId, Text json) {
        AuthorData authorData = new AuthorData();
        authorData.setOid(orcidId.toString());
        JsonElement jElement = new JsonParser().parse(json.toString());
        authorData.setName(SparkGenerateDoiAuthorList.getJsonValue(jElement, "name"));
        authorData.setSurname(SparkGenerateDoiAuthorList.getJsonValue(jElement, "surname"));
        authorData.setCreditName(SparkGenerateDoiAuthorList.getJsonValue(jElement, "creditname"));
        return authorData;
    }

    private static WorkData loadWorkFromJson(Text orcidId, Text json) {
        WorkData workData = new WorkData();
        workData.setOid(orcidId.toString());
        JsonElement jElement = new JsonParser().parse(json.toString());
        workData.setDoi(SparkGenerateDoiAuthorList.getJsonValue(jElement, "doi"));
        return workData;
    }

    private static String getJsonValue(JsonElement jElement, String property) {
        if (jElement.getAsJsonObject().has(property)) {
            JsonElement name = null;
            name = jElement.getAsJsonObject().get(property);
            if (name != null && !name.isJsonNull()) {
                return name.getAsString();
            }
        }
        return null;
    }
}

