/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.sx.graph;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.io.InputStream;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;

public final class SparkCreateInputGraph$ {
    public static final SparkCreateInputGraph$ MODULE$;

    static {
        new SparkCreateInputGraph$();
    }

    public void main(String[] args) {
        Logger log = LoggerFactory.getLogger(this.getClass());
        SparkConf conf = new SparkConf();
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)this.getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json")));
        parser.parseArgument(args);
        SparkSession spark = SparkSession$.MODULE$.builder().config(conf).appName(this.getClass().getSimpleName()).getOrCreate();
        List resultObject = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)"publication", Publication.class), new Tuple2((Object)"dataset", Dataset.class), new Tuple2((Object)"software", Software.class), new Tuple2((Object)"otherResearchProduct", OtherResearchProduct.class)}));
        Encoder oafEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        Encoder publicationEncoder = Encoders$.MODULE$.kryo(Publication.class);
        Encoder datasetEncoder = Encoders$.MODULE$.kryo(Dataset.class);
        Encoder softwareEncoder = Encoders$.MODULE$.kryo(Software.class);
        Encoder orpEncoder = Encoders$.MODULE$.kryo(OtherResearchProduct.class);
        Encoder relEncoder = Encoders$.MODULE$.kryo(Relation.class);
        String sourcePath = parser.get("sourcePath");
        log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"sourcePath  -> ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{sourcePath})));
        String targetPath = parser.get("targetPath");
        log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"targetPath  -> ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        org.apache.spark.sql.Dataset oafDs = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/*"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{sourcePath}))).as(oafEncoder);
        log.info("Extract Publication");
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof Publication;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Publication apply(Oaf p) {
                return (Publication)p;
            }
        }, publicationEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/publication"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        log.info("Extract dataset");
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof Dataset;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Dataset apply(Oaf p) {
                return (Dataset)p;
            }
        }, datasetEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/dataset"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        log.info("Extract software");
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof Software;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Software apply(Oaf p) {
                return (Software)p;
            }
        }, softwareEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/software"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        log.info("Extract otherResearchProduct");
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof OtherResearchProduct;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final OtherResearchProduct apply(Oaf p) {
                return (OtherResearchProduct)p;
            }
        }, orpEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/otherResearchProduct"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        log.info("Extract Relation");
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof Relation;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Relation apply(Oaf p) {
                return (Relation)p;
            }
        }, relEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/relation"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        resultObject.foreach((Function1)new Serializable(log, spark, targetPath){
            public static final long serialVersionUID = 0L;
            private final Logger log$1;
            private final SparkSession spark$1;
            private final String targetPath$1;

            public final void apply(Tuple2<String, Class<? extends Result>> r) {
                this.log$1.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Make ", " unique"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{r._1()})));
                SparkCreateInputGraph$.MODULE$.makeDatasetUnique(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/extracted/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.targetPath$1, r._1()})), new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/preprocess/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.targetPath$1, r._1()})), this.spark$1, (Class)r._2());
            }
            {
                this.log$1 = log$1;
                this.spark$1 = spark$1;
                this.targetPath$1 = targetPath$1;
            }
        });
    }

    public <T extends Oaf> void extractEntities(org.apache.spark.sql.Dataset<Oaf> oafDs, String targetPath, Class<T> clazz, Logger log) {
        Encoder resEncoder = Encoders$.MODULE$.kryo(clazz);
        log.info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Extract ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{clazz.getSimpleName()})));
        oafDs.filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf o) {
                return o instanceof Oaf;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final T apply(Oaf p) {
                return (T)p;
            }
        }, resEncoder).write().mode(SaveMode.Overwrite).save(targetPath);
    }

    public <T extends Result> void makeDatasetUnique(String sourcePath, String targetPath, SparkSession spark, Class<T> clazz) {
        Encoder resEncoder = Encoders$.MODULE$.kryo(clazz);
        org.apache.spark.sql.Dataset ds = spark.read().load(sourcePath).as(resEncoder);
        ds.groupByKey((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(T x$1) {
                return x$1.getId();
            }
        }, spark.implicits().newStringEncoder()).mapGroups((Function2)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final T apply(String id, Iterator<T> it) {
                return (T)((Result)MergeUtils.mergeGroup((java.util.Iterator)((java.util.Iterator)JavaConverters$.MODULE$.asJavaIteratorConverter(it).asJava())));
            }
        }, resEncoder).write().mode(SaveMode.Overwrite).save(targetPath);
    }

    private SparkCreateInputGraph$() {
        MODULE$ = this;
    }
}

