/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.graph.raw;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.raw.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class MergeClaimsApplication {
    private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String rawGraphPath = parser.get("rawGraphPath");
        log.info("rawGraphPath: {}", (Object)rawGraphPath);
        String claimsGraphPath = parser.get("claimsGraphPath");
        log.info("claimsGraphPath: {}", (Object)claimsGraphPath);
        String outputRawGaphPath = parser.get("outputRawGaphPath");
        log.info("outputRawGaphPath: {}", (Object)outputRawGaphPath);
        String graphTableClassName = parser.get("graphTableClassName");
        log.info("graphTableClassName: {}", (Object)graphTableClassName);
        Class<?> clazz = Class.forName(graphTableClassName);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            String type = clazz.getSimpleName().toLowerCase();
            String rawPath = rawGraphPath + "/" + type;
            String claimPath = claimsGraphPath + "/" + type;
            String outPath = outputRawGaphPath + "/" + type;
            MergeClaimsApplication.removeOutputDir(spark, outPath);
            MergeClaimsApplication.mergeByType(spark, rawPath, claimPath, outPath, clazz);
        });
    }

    private static <T extends Oaf> void mergeByType(SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
        Dataset raw = MergeClaimsApplication.readFromPath(spark, rawPath, clazz).map((MapFunction & Serializable)value -> new Tuple2(ModelSupport.idFn().apply(value), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
        JavaSparkContext jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        Dataset claim = ((Dataset)jsc.broadcast(MergeClaimsApplication.readFromPath(spark, claimPath, clazz)).getValue()).map((MapFunction & Serializable)value -> new Tuple2(ModelSupport.idFn().apply(value), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
        raw.joinWith(claim, raw.col("_1").equalTo((Object)claim.col("_1")), "full_outer").map((MapFunction & Serializable)value -> {
            Optional<Object> opRaw = Optional.ofNullable(value._1());
            Optional<Object> opClaim = Optional.ofNullable(value._2());
            return opRaw.isPresent() ? (Oaf)((Tuple2)opRaw.get())._2() : (opClaim.isPresent() ? (Oaf)((Tuple2)opClaim.get())._2() : null);
        }, Encoders.bean(clazz)).filter(Objects::nonNull).map((MapFunction & Serializable)value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(outPath);
    }

    private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
        return spark.read().textFile(path).map((MapFunction & Serializable)value -> (Oaf)OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)).filter((FilterFunction & Serializable)value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

