/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.graph.raw;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class MergeClaimsApplication {
    private static final Logger log = LoggerFactory.getLogger(MergeClaimsApplication.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)Objects.requireNonNull(MergeClaimsApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_claims_parameters.json"))));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String rawGraphPath = parser.get("rawGraphPath");
        log.info("rawGraphPath: {}", (Object)rawGraphPath);
        String claimsGraphPath = parser.get("claimsGraphPath");
        log.info("claimsGraphPath: {}", (Object)claimsGraphPath);
        String outputRawGaphPath = parser.get("outputRawGaphPath");
        log.info("outputRawGaphPath: {}", (Object)outputRawGaphPath);
        String graphTableClassName = parser.get("graphTableClassName");
        log.info("graphTableClassName: {}", (Object)graphTableClassName);
        Class<?> clazz = Class.forName(graphTableClassName);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            String type = clazz.getSimpleName().toLowerCase();
            String rawPath = rawGraphPath + "/" + type;
            String claimPath = claimsGraphPath + "/" + type;
            String outPath = outputRawGaphPath + "/" + type;
            MergeClaimsApplication.removeOutputDir(spark, outPath);
            MergeClaimsApplication.mergeByType(spark, rawPath, claimPath, outPath, clazz);
        });
    }

    private static <T extends Oaf> void mergeByType(SparkSession spark, String rawPath, String claimPath, String outPath, Class<T> clazz) {
        Dataset raw = MergeClaimsApplication.readFromPath(spark, rawPath, clazz).map((MapFunction & Serializable)value -> new Tuple2(ModelSupport.idFn().apply(value), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
        JavaSparkContext jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        Dataset claim = ((Dataset)jsc.broadcast(MergeClaimsApplication.readFromPath(spark, claimPath, clazz)).getValue()).map((MapFunction & Serializable)value -> new Tuple2(ModelSupport.idFn().apply(value), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
        raw.joinWith(claim, raw.col("_1").equalTo((Object)claim.col("_1")), "full_outer").map((MapFunction & Serializable)value -> MergeClaimsApplication.processClaims(Optional.ofNullable(value._1()), Optional.ofNullable(value._2())), Encoders.bean(clazz)).filter(Objects::nonNull).map((MapFunction & Serializable)value -> OBJECT_MAPPER.writeValueAsString(value), Encoders.STRING()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(outPath);
    }

    private static <T extends Oaf> T processClaims(Optional<Tuple2<String, T>> opRaw, Optional<Tuple2<String, T>> opClaim) {
        Oaf oafClaim;
        if (opClaim.isPresent() && opRaw.isPresent() && (oafClaim = (Oaf)opClaim.get()._2()) instanceof Result) {
            Oaf oafRaw = (Oaf)opRaw.get()._2();
            List<Context> context = MergeClaimsApplication.mergeContexts((Result)oafClaim, (Result)oafRaw);
            ((Result)oafRaw).setContext(context);
            return (T)oafRaw;
        }
        return (T)(opRaw.isPresent() ? (Oaf)opRaw.get()._2() : (Oaf)opClaim.map(Tuple2::_2).orElse(null));
    }

    private static List<Context> mergeContexts(Result oafClaim, Result oafRaw) {
        return new ArrayList<Context>(Stream.concat(Optional.ofNullable(oafClaim.getContext()).map(Collection::stream).orElse(Stream.empty()), Optional.ofNullable(oafRaw.getContext()).map(Collection::stream).orElse(Stream.empty())).collect(Collectors.toMap(Context::getId, c -> c, (c1, c2) -> {
            Context c = new Context();
            c.setId(c1.getId());
            c.setDataInfo(new ArrayList<DataInfo>(Stream.concat(Optional.ofNullable(c1.getDataInfo()).map(Collection::stream).orElse(Stream.empty()), Optional.ofNullable(c2.getDataInfo()).map(Collection::stream).orElse(Stream.empty())).collect(Collectors.toMap(d -> Optional.ofNullable(d.getProvenanceaction()).map(Qualifier::getClassid).orElse(""), d -> d, (d1, d2) -> d1)).values()));
            return c;
        })).values());
    }

    private static <T extends Oaf> Dataset<T> readFromPath(SparkSession spark, String path, Class<T> clazz) {
        return spark.read().textFile(path).map((MapFunction & Serializable)value -> (Oaf)OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz)).filter((FilterFunction & Serializable)value -> Objects.nonNull(ModelSupport.idFn().apply(value)));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

