/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.blacklist;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkRemoveBlacklistedRelationJob {
    private static final Logger log = LoggerFactory.getLogger(SparkRemoveBlacklistedRelationJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkRemoveBlacklistedRelationJob.class.getResourceAsStream("/eu/dnetlib/dhp/blacklist/sparkblacklist_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath {}: ", (Object)outputPath);
        String blacklistPath = parser.get("hdfsPath");
        log.info("blacklistPath {}: ", (Object)blacklistPath);
        String mergesPath = parser.get("mergesPath");
        log.info("mergesPath {}: ", (Object)mergesPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkRemoveBlacklistedRelationJob.removeOutputDir(spark, outputPath);
            SparkRemoveBlacklistedRelationJob.removeBlacklistedRelations(spark, blacklistPath, inputPath, outputPath, mergesPath);
        });
    }

    private static void removeBlacklistedRelations(SparkSession spark, String blacklistPath, String inputPath, String outputPath, String mergesPath) {
        Dataset<Relation> blackListed = SparkRemoveBlacklistedRelationJob.readRelations(spark, blacklistPath + "/blacklist");
        Dataset<Relation> inputRelation = SparkRemoveBlacklistedRelationJob.readRelations(spark, inputPath);
        Dataset<Relation> mergesRelation = SparkRemoveBlacklistedRelationJob.readRelations(spark, mergesPath);
        Dataset dedupSource = blackListed.joinWith(mergesRelation, blackListed.col("source").equalTo((Object)mergesRelation.col("target")), "left_outer").map((MapFunction & Serializable)c -> {
            Optional.ofNullable((Relation)c._2()).ifPresent(mr -> ((Relation)c._1()).setSource(mr.getSource()));
            return (Relation)c._1();
        }, Encoders.bean(Relation.class));
        Dataset dedupBL = dedupSource.joinWith(mergesRelation, dedupSource.col("target").equalTo((Object)mergesRelation.col("target")), "left_outer").map((MapFunction & Serializable)c -> {
            Optional.ofNullable((Relation)c._2()).ifPresent(mr -> ((Relation)c._1()).setTarget(mr.getSource()));
            return (Relation)c._1();
        }, Encoders.bean(Relation.class));
        inputRelation.joinWith(dedupBL, inputRelation.col("source").equalTo((Object)dedupBL.col("source")).and(inputRelation.col("target").equalTo((Object)dedupBL.col("target"))), "left_outer").map((MapFunction & Serializable)c -> {
            Relation ir = (Relation)c._1();
            Optional<Relation> obl = Optional.ofNullable((Relation)c._2());
            if (obl.isPresent() && ir.equals((Object)obl.get())) {
                return null;
            }
            return ir;
        }, Encoders.bean(Relation.class)).filter(Objects::nonNull).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    public static Dataset<Relation> readRelations(SparkSession spark, String inputPath) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(inputPath).as(Encoders.bean(Relation.class));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

