/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.orcid.Work;
import eu.dnetlib.dhp.schema.orcid.WorkDetail;
import eu.dnetlib.doiboost.orcidnodoi.xml.XMLRecordParserNoDoi;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkUpdateOrcidDatasets {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);

    public static void main(String[] args) throws Exception {
        Logger logger = LoggerFactory.getLogger(SparkUpdateOrcidDatasets.class);
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkUpdateOrcidDatasets.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        String workingPath = parser.get("workingPath");
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            LongAccumulator oldAuthorsFoundAcc = spark.sparkContext().longAccumulator("old_authors_found");
            LongAccumulator updatedAuthorsFoundAcc = spark.sparkContext().longAccumulator("updated_authors_found");
            LongAccumulator newAuthorsFoundAcc = spark.sparkContext().longAccumulator("new_authors_found");
            LongAccumulator errorCodeAuthorsFoundAcc = spark.sparkContext().longAccumulator("error_code_authors_found");
            LongAccumulator errorLoadingAuthorsJsonFoundAcc = spark.sparkContext().longAccumulator("error_loading_authors_json_found");
            LongAccumulator errorParsingAuthorsXMLFoundAcc = spark.sparkContext().longAccumulator("error_parsing_authors_xml_found");
            LongAccumulator oldWorksFoundAcc = spark.sparkContext().longAccumulator("old_works_found");
            LongAccumulator updatedWorksFoundAcc = spark.sparkContext().longAccumulator("updated_works_found");
            LongAccumulator newWorksFoundAcc = spark.sparkContext().longAccumulator("new_works_found");
            LongAccumulator errorCodeWorksFoundAcc = spark.sparkContext().longAccumulator("error_code_works_found");
            LongAccumulator errorLoadingWorksJsonFoundAcc = spark.sparkContext().longAccumulator("error_loading_works_json_found");
            LongAccumulator errorParsingWorksXMLFoundAcc = spark.sparkContext().longAccumulator("error_parsing_works_xml_found");
            Function & Serializable retrieveWorkFunction = (Function & Serializable)jsonData -> {
                Work work = new Work();
                JsonElement jElement = new JsonParser().parse(jsonData);
                String statusCode = SparkUpdateOrcidDatasets.getJsonValue(jElement, "statusCode");
                work.setStatusCode(statusCode);
                String downloadDate = SparkUpdateOrcidDatasets.getJsonValue(jElement, "lastModifiedDate");
                work.setDownloadDate("2020-11-18 00:00:05.644768");
                if (statusCode.equals("200")) {
                    String compressedData = SparkUpdateOrcidDatasets.getJsonValue(jElement, "compressedData");
                    if (StringUtils.isEmpty((CharSequence)compressedData)) {
                        errorLoadingWorksJsonFoundAcc.add(1L);
                    } else {
                        String xmlWork = ArgumentApplicationParser.decompressValue((String)compressedData);
                        try {
                            WorkDetail workDetail = XMLRecordParserNoDoi.VTDParseWorkData(xmlWork.getBytes());
                            work.setWorkDetail(workDetail);
                            work.setBase64CompressData(compressedData);
                            return work;
                        }
                        catch (Exception e) {
                            logger.error("parsing xml [" + jsonData + "]", (Throwable)e);
                            errorParsingWorksXMLFoundAcc.add(1L);
                        }
                    }
                } else {
                    errorCodeWorksFoundAcc.add(1L);
                }
                return work;
            };
            Dataset downloadedWorksDS = spark.createDataset(sc.textFile(workingPath + "downloads/updated_works/*").map((Function & Serializable)s -> s.substring(21, s.length() - 1)).map((Function)retrieveWorkFunction).rdd(), Encoders.bean(Work.class));
            Dataset currentWorksDS = spark.createDataset(sc.textFile(workingPath.concat("orcid_dataset/works/*")).map((Function & Serializable)item -> (Work)OBJECT_MAPPER.readValue(item, Work.class)).rdd(), Encoders.bean(Work.class));
            currentWorksDS.joinWith(downloadedWorksDS, currentWorksDS.col("workDetail.id").equalTo((Object)downloadedWorksDS.col("workDetail.id")).and(currentWorksDS.col("workDetail.oid").equalTo((Object)downloadedWorksDS.col("workDetail.oid"))), "full_outer").map((MapFunction & Serializable)value -> {
                Optional<Object> opCurrent = Optional.ofNullable(value._1());
                Optional<Object> opDownloaded = Optional.ofNullable(value._2());
                if (!opCurrent.isPresent()) {
                    newWorksFoundAcc.add(1L);
                    return (Work)opDownloaded.get();
                }
                if (!opDownloaded.isPresent()) {
                    oldWorksFoundAcc.add(1L);
                    return (Work)opCurrent.get();
                }
                if (opCurrent.isPresent() && opDownloaded.isPresent()) {
                    updatedWorksFoundAcc.add(1L);
                    return (Work)opDownloaded.get();
                }
                return null;
            }, Encoders.bean(Work.class)).filter(Objects::nonNull).toJavaRDD().map((Function & Serializable)work -> OBJECT_MAPPER.writeValueAsString(work)).saveAsTextFile(workingPath.concat("orcid_dataset/new_works"), GzipCodec.class);
            logger.info("oldWorksFoundAcc: " + oldWorksFoundAcc.value().toString());
            logger.info("newWorksFoundAcc: " + newWorksFoundAcc.value().toString());
            logger.info("updatedWorksFoundAcc: " + updatedWorksFoundAcc.value().toString());
            logger.info("errorCodeWorksFoundAcc: " + errorCodeWorksFoundAcc.value().toString());
            logger.info("errorLoadingJsonWorksFoundAcc: " + errorLoadingWorksJsonFoundAcc.value().toString());
            logger.info("errorParsingXMLWorksFoundAcc: " + errorParsingWorksXMLFoundAcc.value().toString());
        });
    }

    private static String getJsonValue(JsonElement jElement, String property) {
        if (jElement.getAsJsonObject().has(property)) {
            JsonElement name = null;
            name = jElement.getAsJsonObject().get(property);
            if (name != null && !name.isJsonNull()) {
                return name.getAsString();
            }
        }
        return "";
    }
}

