/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcidnodoi;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.orcid.AuthorData;
import eu.dnetlib.dhp.schema.orcid.AuthorSummary;
import eu.dnetlib.dhp.schema.orcid.Contributor;
import eu.dnetlib.dhp.schema.orcid.Work;
import eu.dnetlib.dhp.schema.orcid.WorkDetail;
import eu.dnetlib.doiboost.orcid.json.JsonHelper;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import eu.dnetlib.doiboost.orcidnodoi.oaf.PublicationToOaf;
import eu.dnetlib.doiboost.orcidnodoi.similarity.AuthorMatcher;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkGenEnrichedOrcidWorks {
    static Logger logger = LoggerFactory.getLogger(SparkGenEnrichedOrcidWorks.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkGenEnrichedOrcidWorks.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/gen_orcid-no-doi_params.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        String hdfsServerUri = parser.get("hdfsServerUri");
        String workingPath = parser.get("workingPath");
        String outputEnrichedWorksPath = parser.get("outputEnrichedWorksPath");
        String orcidDataFolder = parser.get("orcidDataFolder");
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            String lastUpdate = HDFSUtil.readFromTextFile(hdfsServerUri, workingPath, "last_update.txt");
            if (StringUtils.isBlank((CharSequence)lastUpdate)) {
                throw new RuntimeException("last update info not found");
            }
            String dateOfCollection = lastUpdate.substring(0, 10);
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            Dataset authorDataset = spark.createDataset(sc.textFile(workingPath.concat(orcidDataFolder).concat("/authors/*")).map((Function & Serializable)item -> (AuthorSummary)OBJECT_MAPPER.readValue(item, AuthorSummary.class)).filter((Function & Serializable)authorSummary -> authorSummary.getAuthorData() != null).map((Function & Serializable)authorSummary -> authorSummary.getAuthorData()).rdd(), Encoders.bean(AuthorData.class));
            logger.info("Authors data loaded: " + authorDataset.count());
            Dataset workDataset = spark.createDataset(sc.textFile(workingPath.concat(orcidDataFolder).concat("/works/*")).map((Function & Serializable)item -> (Work)OBJECT_MAPPER.readValue(item, Work.class)).filter((Function & Serializable)work -> work.getWorkDetail() != null).map((Function & Serializable)work -> work.getWorkDetail()).filter((Function & Serializable)work -> work.getErrorCode() == null).filter((Function & Serializable)work -> work.getExtIds().stream().filter(e -> e.getType() != null).noneMatch(e -> e.getType().equalsIgnoreCase("doi"))).rdd(), Encoders.bean(WorkDetail.class));
            logger.info("Works data loaded: " + workDataset.count());
            LongAccumulator warnNotFoundContributors = spark.sparkContext().longAccumulator("warnNotFoundContributors");
            JavaRDD enrichedWorksRDD = workDataset.joinWith(authorDataset, workDataset.col("oid").equalTo((Object)authorDataset.col("oid")), "inner").map((MapFunction & Serializable)value -> {
                WorkDetail w = (WorkDetail)value._1;
                AuthorData a = (AuthorData)value._2;
                if (w.getContributors() == null || w.getContributors() != null && w.getContributors().size() == 0) {
                    Contributor c = new Contributor();
                    c.setName(a.getName());
                    c.setSurname(a.getSurname());
                    c.setCreditName(a.getCreditName());
                    c.setOid(a.getOid());
                    List<Contributor> contributors = Arrays.asList(c);
                    w.setContributors(contributors);
                    if (warnNotFoundContributors != null) {
                        warnNotFoundContributors.add(1L);
                    }
                } else {
                    AuthorMatcher.match(a, w.getContributors());
                }
                return new Tuple2((Object)a.getOid(), (Object)JsonHelper.createOidWork(w));
            }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).filter(Objects::nonNull).toJavaRDD();
            logger.info("Enriched works RDD ready.");
            LongAccumulator parsedPublications = spark.sparkContext().longAccumulator("parsedPublications");
            LongAccumulator enrichedPublications = spark.sparkContext().longAccumulator("enrichedPublications");
            LongAccumulator errorsGeneric = spark.sparkContext().longAccumulator("errorsGeneric");
            LongAccumulator errorsInvalidTitle = spark.sparkContext().longAccumulator("errorsInvalidTitle");
            LongAccumulator errorsNotFoundAuthors = spark.sparkContext().longAccumulator("errorsNotFoundAuthors");
            LongAccumulator errorsInvalidType = spark.sparkContext().longAccumulator("errorsInvalidType");
            LongAccumulator otherTypeFound = spark.sparkContext().longAccumulator("otherTypeFound");
            LongAccumulator deactivatedAcc = spark.sparkContext().longAccumulator("deactivated_found");
            LongAccumulator titleNotProvidedAcc = spark.sparkContext().longAccumulator("Title_not_provided_found");
            LongAccumulator noUrlAcc = spark.sparkContext().longAccumulator("no_url_found");
            PublicationToOaf publicationToOaf = new PublicationToOaf(parsedPublications, enrichedPublications, errorsGeneric, errorsInvalidTitle, errorsNotFoundAuthors, errorsInvalidType, otherTypeFound, deactivatedAcc, titleNotProvidedAcc, noUrlAcc, dateOfCollection);
            JavaRDD oafPublicationRDD = enrichedWorksRDD.map((Function & Serializable)e -> (Publication)publicationToOaf.generatePublicationActionsFromJson((String)e._2())).filter((Function & Serializable)p -> p != null);
            sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
            oafPublicationRDD.mapToPair((PairFunction & Serializable)p -> new Tuple2((Object)p.getClass().toString(), (Object)OBJECT_MAPPER.writeValueAsString((Object)new AtomicAction(Publication.class, (Oaf)p)))).mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)new Text((String)t._1()), (Object)new Text((String)t._2()))).saveAsNewAPIHadoopFile(outputEnrichedWorksPath, Text.class, Text.class, SequenceFileOutputFormat.class, sc.hadoopConfiguration());
            logger.info("parsedPublications: " + parsedPublications.value().toString());
            logger.info("enrichedPublications: " + enrichedPublications.value().toString());
            logger.info("warnNotFoundContributors: " + warnNotFoundContributors.value().toString());
            logger.info("errorsGeneric: " + errorsGeneric.value().toString());
            logger.info("errorsInvalidTitle: " + errorsInvalidTitle.value().toString());
            logger.info("errorsNotFoundAuthors: " + errorsNotFoundAuthors.value().toString());
            logger.info("errorsInvalidType: " + errorsInvalidType.value().toString());
            logger.info("otherTypeFound: " + otherTypeFound.value().toString());
            logger.info("deactivatedAcc: " + deactivatedAcc.value().toString());
            logger.info("titleNotProvidedAcc: " + titleNotProvidedAcc.value().toString());
            logger.info("noUrlAcc: " + noUrlAcc.value().toString());
        });
    }
}

