/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.orcid.AuthorSummary;
import eu.dnetlib.doiboost.orcid.xml.XMLRecordParser;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.functions;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkUpdateOrcidAuthors {
    public static final Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidAuthors.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkUpdateOrcidAuthors.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        String workingPath = parser.get("workingPath");
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            LongAccumulator oldAuthorsFoundAcc = spark.sparkContext().longAccumulator("old_authors_found");
            LongAccumulator updatedAuthorsFoundAcc = spark.sparkContext().longAccumulator("updated_authors_found");
            LongAccumulator newAuthorsFoundAcc = spark.sparkContext().longAccumulator("new_authors_found");
            LongAccumulator errorCodeAuthorsFoundAcc = spark.sparkContext().longAccumulator("error_code_authors_found");
            LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark.sparkContext().longAccumulator("error_loading_authors_json_found");
            LongAccumulator errorParsingAuthorsXMLFoundAcc = spark.sparkContext().longAccumulator("error_parsing_authors_xml_found");
            Function & Serializable retrieveAuthorSummaryFunction = (Function & Serializable)data -> {
                AuthorSummary authorSummary = new AuthorSummary();
                String orcidId = ((Text)data._1()).toString();
                String jsonData = ((Text)data._2()).toString();
                JsonElement jElement = new JsonParser().parse(jsonData);
                String statusCode = SparkUpdateOrcidAuthors.getJsonValue(jElement, "statusCode");
                if (statusCode.equals("200")) {
                    String compressedData = SparkUpdateOrcidAuthors.getJsonValue(jElement, "compressedData");
                    if (StringUtils.isEmpty((CharSequence)compressedData)) {
                        errorLoadingAuthorsJsonFoundAcc.add(1L);
                    } else {
                        String xmlAuthor = ArgumentApplicationParser.decompressValue((String)compressedData);
                        try {
                            authorSummary = XMLRecordParser.VTDParseAuthorSummary(xmlAuthor.getBytes());
                            authorSummary.setStatusCode(statusCode);
                            authorSummary.setDownloadDate(Long.toString(System.currentTimeMillis()));
                            authorSummary.setBase64CompressData(compressedData);
                            return authorSummary;
                        }
                        catch (Exception e) {
                            logger.error("parsing xml " + orcidId + " [" + jsonData + "]", (Throwable)e);
                            errorParsingAuthorsXMLFoundAcc.add(1L);
                        }
                    }
                } else {
                    authorSummary.setStatusCode(statusCode);
                    authorSummary.setDownloadDate(Long.toString(System.currentTimeMillis()));
                    errorCodeAuthorsFoundAcc.add(1L);
                }
                return authorSummary;
            };
            Dataset downloadedAuthorSummaryDS = spark.createDataset(sc.sequenceFile(workingPath + "downloads/updated_authors/*", Text.class, Text.class).map((Function)retrieveAuthorSummaryFunction).rdd(), Encoders.bean(AuthorSummary.class));
            Dataset currentAuthorSummaryDS = spark.createDataset(sc.textFile(workingPath.concat("orcid_dataset/authors/*")).map((Function & Serializable)item -> (AuthorSummary)OBJECT_MAPPER.readValue(item, AuthorSummary.class)).rdd(), Encoders.bean(AuthorSummary.class));
            Dataset mergedAuthorSummaryDS = currentAuthorSummaryDS.joinWith(downloadedAuthorSummaryDS, currentAuthorSummaryDS.col("authorData.oid").equalTo((Object)downloadedAuthorSummaryDS.col("authorData.oid")), "full_outer").map((MapFunction & Serializable)value -> {
                Optional<Object> opCurrent = Optional.ofNullable(value._1());
                Optional<Object> opDownloaded = Optional.ofNullable(value._2());
                if (!opCurrent.isPresent()) {
                    newAuthorsFoundAcc.add(1L);
                    return (AuthorSummary)opDownloaded.get();
                }
                if (!opDownloaded.isPresent()) {
                    oldAuthorsFoundAcc.add(1L);
                    return (AuthorSummary)opCurrent.get();
                }
                if (opCurrent.isPresent() && opDownloaded.isPresent()) {
                    updatedAuthorsFoundAcc.add(1L);
                    return (AuthorSummary)opDownloaded.get();
                }
                return null;
            }, Encoders.bean(AuthorSummary.class)).filter(Objects::nonNull);
            long mergedCount = mergedAuthorSummaryDS.count();
            Dataset base64DedupedDS = mergedAuthorSummaryDS.dropDuplicates("base64CompressData", new String[0]);
            List dupOids = base64DedupedDS.groupBy("authorData.oid", new String[0]).agg(functions.count((String)"authorData.oid").alias("oidOccurrenceCount"), new Column[0]).where("oidOccurrenceCount > 1").select("oid", new String[0]).toJavaRDD().map((Function & Serializable)row -> row.get(0).toString()).collect();
            JavaRDD dupAuthors = base64DedupedDS.toJavaRDD().filter((Function & Serializable)authorSummary -> Objects.nonNull(authorSummary.getAuthorData()) && Objects.nonNull(authorSummary.getAuthorData().getOid())).filter((Function & Serializable)authorSummary -> dupOids.contains(authorSummary.getAuthorData().getOid()));
            Dataset dupAuthorSummaryDS = spark.createDataset(dupAuthors.rdd(), Encoders.bean(AuthorSummary.class));
            List lastModifiedAuthors = dupAuthorSummaryDS.groupBy("authorData.oid", new String[0]).agg(functions.array_max((Column)functions.collect_list((String)"downloadDate")), new Column[0]).map((MapFunction & Serializable)row -> new Tuple2((Object)row.get(0).toString(), (Object)row.get(1).toString()), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).toJavaRDD().collect();
            JavaRDD lastDownloadedAuthors = base64DedupedDS.toJavaRDD().filter((Function & Serializable)authorSummary -> Objects.nonNull(authorSummary.getAuthorData()) && Objects.nonNull(authorSummary.getAuthorData().getOid())).filter((Function & Serializable)authorSummary -> {
                boolean oidFound = lastModifiedAuthors.stream().filter(a -> ((String)a._1()).equals(authorSummary.getAuthorData().getOid())).count() == 1L;
                boolean tsFound = lastModifiedAuthors.stream().filter(a -> ((String)a._1()).equals(authorSummary.getAuthorData().getOid()) && ((String)a._2()).equals(authorSummary.getDownloadDate())).count() == 1L;
                return !oidFound || tsFound;
            });
            Dataset cleanedDS = spark.createDataset(lastDownloadedAuthors.rdd(), Encoders.bean(AuthorSummary.class)).dropDuplicates("downloadDate", new String[]{"authorData"});
            cleanedDS.toJavaRDD().map(arg_0 -> ((ObjectMapper)OBJECT_MAPPER).writeValueAsString(arg_0)).saveAsTextFile(workingPath.concat("orcid_dataset/new_authors"), GzipCodec.class);
            long cleanedDSCount = cleanedDS.count();
            logger.info("report_oldAuthorsFoundAcc: {}", (Object)oldAuthorsFoundAcc.value());
            logger.info("report_newAuthorsFoundAcc: {}", (Object)newAuthorsFoundAcc.value());
            logger.info("report_updatedAuthorsFoundAcc: {}", (Object)updatedAuthorsFoundAcc.value());
            logger.info("report_errorCodeFoundAcc: {}", (Object)errorCodeAuthorsFoundAcc.value());
            logger.info("report_errorLoadingJsonFoundAcc: {}", (Object)errorLoadingAuthorsJsonFoundAcc.value());
            logger.info("report_errorParsingXMLFoundAcc: {}", (Object)errorParsingAuthorsXMLFoundAcc.value());
            logger.info("report_merged_count: {}", (Object)mergedCount);
            logger.info("report_cleaned_count: {}", (Object)cleanedDSCount);
        });
    }

    private static String getJsonValue(JsonElement jElement, String property) {
        JsonElement name;
        if (jElement.getAsJsonObject().has(property) && (name = jElement.getAsJsonObject().get(property)) != null && !name.isJsonNull()) {
            return name.getAsString();
        }
        return "";
    }
}

