/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.sx.graph;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Function2;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

public final class SparkCreateInputGraph$ {
    public static SparkCreateInputGraph$ MODULE$;

    static {
        new SparkCreateInputGraph$();
    }

    public void main(String[] args) {
        Logger log = LoggerFactory.getLogger(this.getClass());
        SparkConf conf = new SparkConf();
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)this.getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json")));
        parser.parseArgument(args);
        SparkSession spark = SparkSession$.MODULE$.builder().config(conf).appName(this.getClass().getSimpleName()).master(parser.get("master")).getOrCreate();
        .colon.colon resultObject = new .colon.colon((Object)new Tuple2((Object)"publication", Publication.class), (List)new .colon.colon((Object)new Tuple2((Object)"dataset", Dataset.class), (List)new .colon.colon((Object)new Tuple2((Object)"software", Software.class), (List)new .colon.colon((Object)new Tuple2((Object)"otherResearchProduct", OtherResearchProduct.class), (List)Nil$.MODULE$))));
        Encoder oafEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        Encoder publicationEncoder = Encoders$.MODULE$.kryo(Publication.class);
        Encoder datasetEncoder = Encoders$.MODULE$.kryo(Dataset.class);
        Encoder softwareEncoder = Encoders$.MODULE$.kryo(Software.class);
        Encoder orpEncoder = Encoders$.MODULE$.kryo(OtherResearchProduct.class);
        Encoder relEncoder = Encoders$.MODULE$.kryo(Relation.class);
        String sourcePath = parser.get("sourcePath");
        log.info(new StringBuilder(15).append("sourcePath  -> ").append(sourcePath).toString());
        String targetPath = parser.get("targetPath");
        log.info(new StringBuilder(15).append("targetPath  -> ").append(targetPath).toString());
        org.apache.spark.sql.Dataset oafDs = spark.read().load(new StringBuilder(2).append(sourcePath).append("/*").toString()).as(oafEncoder);
        log.info("Extract Publication");
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$main$1(o))).map((Function1 & Serializable & scala.Serializable)p -> (Publication)p, publicationEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(22).append(targetPath).append("/extracted/publication").toString());
        log.info("Extract dataset");
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$main$3(o))).map((Function1 & Serializable & scala.Serializable)p -> (Dataset)p, datasetEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(18).append(targetPath).append("/extracted/dataset").toString());
        log.info("Extract software");
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$main$5(o))).map((Function1 & Serializable & scala.Serializable)p -> (Software)p, softwareEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(19).append(targetPath).append("/extracted/software").toString());
        log.info("Extract otherResearchProduct");
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$main$7(o))).map((Function1 & Serializable & scala.Serializable)p -> (OtherResearchProduct)p, orpEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(31).append(targetPath).append("/extracted/otherResearchProduct").toString());
        log.info("Extract Relation");
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$main$9(o))).map((Function1 & Serializable & scala.Serializable)p -> (Relation)p, relEncoder).write().mode(SaveMode.Overwrite).save(new StringBuilder(19).append(targetPath).append("/extracted/relation").toString());
        resultObject.foreach((Function1 & Serializable & scala.Serializable)r -> {
            SparkCreateInputGraph$.$anonfun$main$11(log, targetPath, spark, r);
            return BoxedUnit.UNIT;
        });
    }

    public <T extends Oaf> void extractEntities(org.apache.spark.sql.Dataset<Oaf> oafDs, String targetPath, Class<T> clazz, Logger log) {
        Encoder resEncoder = Encoders$.MODULE$.kryo(clazz);
        log.info(new StringBuilder(8).append("Extract ").append(clazz.getSimpleName()).toString());
        oafDs.filter((Function1 & Serializable & scala.Serializable)o -> BoxesRunTime.boxToBoolean((boolean)SparkCreateInputGraph$.$anonfun$extractEntities$1(o))).map((Function1 & Serializable & scala.Serializable)p -> p, resEncoder).write().mode(SaveMode.Overwrite).save(targetPath);
    }

    public <T extends Result> void makeDatasetUnique(String sourcePath, String targetPath, SparkSession spark, Class<T> clazz) {
        Encoder resEncoder = Encoders$.MODULE$.kryo(clazz);
        org.apache.spark.sql.Dataset ds = spark.read().load(sourcePath).as(resEncoder);
        ds.groupByKey((Function1 & Serializable & scala.Serializable)x$1 -> x$1.getId(), spark.implicits().newStringEncoder()).mapGroups((Function2 & Serializable & scala.Serializable)(id, it) -> (Result)MergeUtils.mergeGroup((Iterator)((Iterator)JavaConverters$.MODULE$.asJavaIteratorConverter(it).asJava())), resEncoder).write().mode(SaveMode.Overwrite).save(targetPath);
    }

    public static final /* synthetic */ boolean $anonfun$main$1(Oaf o) {
        return o instanceof Publication;
    }

    public static final /* synthetic */ boolean $anonfun$main$3(Oaf o) {
        return o instanceof Dataset;
    }

    public static final /* synthetic */ boolean $anonfun$main$5(Oaf o) {
        return o instanceof Software;
    }

    public static final /* synthetic */ boolean $anonfun$main$7(Oaf o) {
        return o instanceof OtherResearchProduct;
    }

    public static final /* synthetic */ boolean $anonfun$main$9(Oaf o) {
        return o instanceof Relation;
    }

    public static final /* synthetic */ void $anonfun$main$11(Logger log$1, String targetPath$1, SparkSession spark$1, Tuple2 r) {
        log$1.info(new StringBuilder(12).append("Make ").append(r._1()).append(" unique").toString());
        MODULE$.makeDatasetUnique(new StringBuilder(11).append(targetPath$1).append("/extracted/").append(r._1()).toString(), new StringBuilder(12).append(targetPath$1).append("/preprocess/").append(r._1()).toString(), spark$1, (Class)r._2());
    }

    public static final /* synthetic */ boolean $anonfun$extractEntities$1(Oaf o) {
        return o instanceof Oaf;
    }

    private SparkCreateInputGraph$() {
        MODULE$ = this;
    }
}

