/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.crossref;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.AbstractScalaApplication;
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf$;
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf$TransformationType$;
import eu.dnetlib.dhp.collection.crossref.SparkMapDumpIntoOAF$;
import eu.dnetlib.dhp.collection.crossref.UnpayWall;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.reflect.ScalaSignature;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;

@ScalaSignature(bytes="\u0006\u0001a4A!\u0001\u0002\u0001\u001b\t\u00192\u000b]1sW6\u000b\u0007\u000fR;na&sGo\\(B\r*\u00111\u0001B\u0001\tGJ|7o\u001d:fM*\u0011QAB\u0001\u000bG>dG.Z2uS>t'BA\u0004\t\u0003\r!\u0007\u000e\u001d\u0006\u0003\u0013)\tq\u0001\u001a8fi2L'MC\u0001\f\u0003\t)Wo\u0001\u0001\u0014\u0005\u0001q\u0001CA\b\u0013\u001b\u0005\u0001\"BA\t\u0007\u0003-\t\u0007\u000f\u001d7jG\u0006$\u0018n\u001c8\n\u0005M\u0001\"\u0001G!cgR\u0014\u0018m\u0019;TG\u0006d\u0017-\u00119qY&\u001c\u0017\r^5p]\"IQ\u0003\u0001B\u0001B\u0003%a\u0003I\u0001\raJ|\u0007/\u001a:usB\u000bG\u000f\u001b\t\u0003/uq!\u0001G\u000e\u000e\u0003eQ\u0011AG\u0001\u0006g\u000e\fG.Y\u0005\u00039e\ta\u0001\u0015:fI\u00164\u0017B\u0001\u0010 \u0005\u0019\u0019FO]5oO*\u0011A$G\u0005\u0003+IA\u0011B\t\u0001\u0003\u0002\u0003\u0006Ia\t\u0014\u0002\t\u0005\u0014xm\u001d\t\u00041\u00112\u0012BA\u0013\u001a\u0005\u0015\t%O]1z\u0013\t\u0011#\u0003\u0003\u0005)\u0001\t\u0005\t\u0015!\u0003*\u0003\rawn\u001a\t\u0003U=j\u0011a\u000b\u0006\u0003Y5\nQa\u001d7gi)T\u0011AL\u0001\u0004_J<\u0017B\u0001\u0019,\u0005\u0019aunZ4fe\")!\u0007\u0001C\u0001g\u00051A(\u001b8jiz\"B\u0001\u000e\u001c8qA\u0011Q\u0007A\u0007\u0002\u0005!)Q#\ra\u0001-!)!%\ra\u0001G!)\u0001&\ra\u0001S!)!\b\u0001C!w\u0005\u0019!/\u001e8\u0015\u0003q\u0002\"\u0001G\u001f\n\u0005yJ\"\u0001B+oSRDQ\u0001\u0011\u0001\u0005\u0002\u0005\u000b!\u0003\u001e:b]N4wN]7V]B\f\u0017pV1mYR!!iT*V!\r\u0019%\nT\u0007\u0002\t*\u0011QIR\u0001\u0004gFd'BA$I\u0003\u0015\u0019\b/\u0019:l\u0015\tIU&\u0001\u0004ba\u0006\u001c\u0007.Z\u0005\u0003\u0017\u0012\u0013q\u0001R1uCN,G\u000f\u0005\u00026\u001b&\u0011aJ\u0001\u0002\n+:\u0004\u0018-_,bY2DQaR A\u0002A\u0003\"aQ)\n\u0005I#%\u0001D*qCJ\\7+Z:tS>t\u0007\"\u0002+@\u0001\u00041\u0012!D;oa\u0006Lx/\u00197m!\u0006$\b\u000eC\u0003W\u007f\u0001\u0007a#\u0001\u0007de>\u001c8O]3g!\u0006$\b\u000eC\u0003Y\u0001\u0011\u0005\u0011,A\tue\u0006t7OZ8s[\u000e\u0013xn]:sK\u001a$b\u0001\u0010.\\;~\u0003\u0007\"B$X\u0001\u0004\u0001\u0006\"\u0002/X\u0001\u00041\u0012AC:pkJ\u001cW\rU1uQ\")al\u0016a\u0001-\u0005QA/\u0019:hKR\u0004\u0016\r\u001e5\t\u000bQ;\u0006\u0019\u0001\f\t\u000b\u0005<\u0006\u0019\u00012\u0002\u0019Y|7-\u00192vY\u0006\u0014\u0018.Z:\u0011\u0005\rDW\"\u00013\u000b\u0005\u00154\u0017A\u0003<pG\u0006\u0014W\u000f\\1ss*\u0011qMB\u0001\u0007G>lWn\u001c8\n\u0005%$'a\u0004,pG\u0006\u0014W\u000f\\1ss\u001e\u0013x.\u001e9\b\u000b-\u0014\u0001\u0012\u00017\u0002'M\u0003\u0018M]6NCB$U/\u001c9J]R|w*\u0011$\u0011\u0005Ujg!B\u0001\u0003\u0011\u0003q7CA7p!\tA\u0002/\u0003\u0002r3\t1\u0011I\\=SK\u001aDQAM7\u0005\u0002M$\u0012\u0001\u001c\u0005\u0006k6$\tA^\u0001\u0005[\u0006Lg\u000e\u0006\u0002=o\")!\u0005\u001ea\u0001G\u0001")
public class SparkMapDumpIntoOAF
extends AbstractScalaApplication {
    private final Logger log;

    public static void main(String[] stringArray) {
        SparkMapDumpIntoOAF$.MODULE$.main(stringArray);
    }

    public void run() {
        String sourcePath = this.parser().get("sourcePath");
        this.log.info("sourcePath: {}", new Object[]{sourcePath});
        String unpaywallPath = this.parser().get("unpaywallPath");
        this.log.info("unpaywallPath: {}", new Object[]{unpaywallPath});
        String isLookupUrl = this.parser().get("isLookupUrl");
        this.log.info("isLookupUrl: {}", new Object[]{isLookupUrl});
        ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookupService);
        Predef$.MODULE$.require(vocabularies != null);
        String mdstoreOutputVersion = this.parser().get("mdstoreOutputVersion");
        this.log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"mdstoreOutputVersion is '", "'"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{mdstoreOutputVersion})));
        ObjectMapper mapper = new ObjectMapper();
        MDStoreVersion cleanedMdStoreVersion = (MDStoreVersion)mapper.readValue(mdstoreOutputVersion, MDStoreVersion.class);
        String outputBasePath = cleanedMdStoreVersion.getHdfsPath();
        this.log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"outputBasePath is '", "'"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{outputBasePath})));
        String targetPath = new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{outputBasePath, "/store"}));
        this.log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"targetPath is '", "'"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        this.transformCrossref(this.spark(), sourcePath, targetPath, unpaywallPath, vocabularies);
        this.reportTotalSize(targetPath, outputBasePath);
    }

    public Dataset<UnpayWall> transformUnpayWall(SparkSession spark, String unpaywallPath, String crossrefPath) {
        StructType schema = new StructType().add(new StructField("doi", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add(new StructField("is_oa", (DataType)BooleanType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add(new StructField("best_oa_location", (DataType)new StructType().add("host_type", (DataType)StringType$.MODULE$).add("license", (DataType)StringType$.MODULE$).add("url", (DataType)StringType$.MODULE$), StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add("oa_status", (DataType)StringType$.MODULE$);
        Dataset cId = spark.read().schema(new StructType().add("DOI", (DataType)StringType$.MODULE$)).json(crossrefPath).withColumn("doi", functions$.MODULE$.lower(functions$.MODULE$.col("DOI")));
        Dataset uw = spark.read().schema(schema).json(unpaywallPath).withColumn("doi", functions$.MODULE$.lower(functions$.MODULE$.col("doi"))).where("is_oa = true and best_oa_location.url is not null");
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(SparkMapDumpIntoOAF.class.getClassLoader());
        public final class Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator4$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("eu.dnetlib.dhp.collection.crossref.UnpayWall").asType().toTypeConstructor();
            }

            public Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator4$1(SparkMapDumpIntoOAF $outer) {
            }
        }
        return uw.join(cId, uw.apply("doi").$eq$eq$eq((Object)cId.apply("doi")), "leftsemi").as(spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator4$1(this)))).cache();
    }

    public void transformCrossref(SparkSession spark, String sourcePath, String targetPath, String unpaywallPath, VocabularyGroup vocabularies) {
        ObjectMapper mapper = new ObjectMapper();
        Encoder oafEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        Encoder resultEncoder = Encoders$.MODULE$.kryo(Result.class);
        Dataset dump = spark.read().text(sourcePath).as(spark.implicits().newStringEncoder());
        dump.flatMap((Function1)new Serializable(this, vocabularies){
            public static final long serialVersionUID = 0L;
            private final VocabularyGroup vocabularies$1;

            public final List<Oaf> apply(String s) {
                return Crossref2Oaf$.MODULE$.convert(s, this.vocabularies$1, Crossref2Oaf$TransformationType$.MODULE$.OnlyRelation());
            }
            {
                this.vocabularies$1 = vocabularies$1;
            }
        }, oafEncoder).as(oafEncoder).map((Function1)new Serializable(this, mapper){
            public static final long serialVersionUID = 0L;
            private final ObjectMapper mapper$1;

            public final String apply(Oaf r) {
                return this.mapper$1.writeValueAsString((Object)r);
            }
            {
                this.mapper$1 = mapper$1;
            }
        }, spark.implicits().newStringEncoder()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(targetPath);
        Dataset<UnpayWall> uw = this.transformUnpayWall(spark, unpaywallPath, sourcePath);
        Dataset resultCrossref = dump.flatMap((Function1)new Serializable(this, vocabularies){
            public static final long serialVersionUID = 0L;
            private final VocabularyGroup vocabularies$1;

            public final List<Oaf> apply(String s) {
                return Crossref2Oaf$.MODULE$.convert(s, this.vocabularies$1, Crossref2Oaf$TransformationType$.MODULE$.OnlyResult());
            }
            {
                this.vocabularies$1 = vocabularies$1;
            }
        }, oafEncoder).as(oafEncoder).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Result apply(Oaf r) {
                return (Result)r;
            }
        }, resultEncoder).map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, Result> apply(Result r) {
                return new Tuple2((Object)((StructuredProperty)r.getPid().get(0)).getValue(), (Object)r);
            }
        }, Encoders$.MODULE$.tuple(Encoders$.MODULE$.STRING(), resultEncoder));
        resultCrossref.joinWith(uw, resultCrossref.apply("_1").equalTo((Object)uw.apply("doi")), "left").map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final Result apply(Tuple2<Tuple2<String, Result>, UnpayWall> k) {
                return Crossref2Oaf$.MODULE$.mergeUnpayWall((Result)((Tuple2)k._1())._2(), (UnpayWall)k._2());
            }
        }, resultEncoder).map((Function1)new Serializable(this, mapper){
            public static final long serialVersionUID = 0L;
            private final ObjectMapper mapper$1;

            public final String apply(Result r) {
                return this.mapper$1.writeValueAsString((Object)r);
            }
            {
                this.mapper$1 = mapper$1;
            }
        }, spark.implicits().newStringEncoder()).as(resultEncoder).write().mode(SaveMode.Append).option("compression", "gzip").text(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        spark.read().json(sourcePath).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("author.affiliation")).alias("affiliations")})).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("affiliations.id")).alias("aids")})).where("aids is not null").select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("aids")).alias("aff")})).select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.col("aff.id").alias("id"), functions$.MODULE$.col("aff.id-type").alias("idType")})).where(functions$.MODULE$.col("idType").like("ROR")).flatMap((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final List<String> apply(Row r) {
                return Crossref2Oaf$.MODULE$.generateAffliation(r);
            }
        }, spark.implicits().newStringEncoder()).write().mode(SaveMode.Append).option("compression", "gzip").text(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
    }

    public SparkMapDumpIntoOAF(String propertyPath, String[] args, Logger log) {
        this.log = log;
        super(propertyPath, args, log);
    }
}

