/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.countrypropagation;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.countrypropagation.CountrySbs;
import eu.dnetlib.dhp.countrypropagation.ResultCountrySet;
import eu.dnetlib.dhp.schema.oaf.Country;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkCountryPropagationJob {
    private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_countrypropagation_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath");
        log.info("sourcePath: {}", (Object)sourcePath);
        String preparedInfoPath = parser.get("preparedInfoPath");
        log.info("preparedInfoPath: {}", (Object)preparedInfoPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String resultClassName = parser.get("resultTableName");
        log.info("resultTableName: {}", (Object)resultClassName);
        Class<?> resultClazz = Class.forName(resultClassName);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PropagationConstant.removeOutputDir(spark, outputPath);
            SparkCountryPropagationJob.execPropagation(spark, sourcePath, preparedInfoPath, outputPath, resultClazz);
        });
    }

    private static <R extends Result> void execPropagation(SparkSession spark, String sourcePath, String preparedInfoPath, String outputPath, Class<R> resultClazz) {
        log.info("Reading Graph table from: {}", (Object)sourcePath);
        Dataset<R> res = PropagationConstant.readPath(spark, sourcePath, resultClazz);
        log.info("Reading prepared info: {}", (Object)preparedInfoPath);
        Dataset prepared = spark.read().json(preparedInfoPath).as(Encoders.bean(ResultCountrySet.class));
        res.joinWith(prepared, res.col("id").equalTo((Object)prepared.col("resultId")), "left_outer").map(SparkCountryPropagationJob.getCountryMergeFn(), Encoders.bean(resultClazz)).write().option("compression", "gzip").mode(SaveMode.Overwrite).json(outputPath);
    }

    private static <R extends Result> MapFunction<Tuple2<R, ResultCountrySet>, R> getCountryMergeFn() {
        return (MapFunction & Serializable)t -> {
            Optional.ofNullable(t._2()).ifPresent(r -> ((Result)t._1()).getCountry().addAll(SparkCountryPropagationJob.merge(((Result)t._1()).getCountry(), r.getCountrySet())));
            return (Result)t._1();
        };
    }

    private static List<Country> merge(List<Country> c1, List<CountrySbs> c2) {
        HashSet countries = c1.stream().map(Qualifier::getClassid).collect(Collectors.toCollection(HashSet::new));
        return c2.stream().filter(c -> !countries.contains(c.getClassid())).map(c -> PropagationConstant.getCountry(c.getClassid(), c.getClassname())).collect(Collectors.toList());
    }
}

