/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class PrepareRelationsJob {
    private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final int MAX_RELS = 100;

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputRelationsPath = parser.get("inputRelationsPath");
        log.info("inputRelationsPath: {}", (Object)inputRelationsPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PrepareRelationsJob.removeOutputDir(spark, outputPath);
            PrepareRelationsJob.prepareRelationsFromPaths(spark, inputRelationsPath, outputPath);
        });
    }

    private static void prepareRelationsFromPaths(SparkSession spark, String inputRelationsPath, String outputPath) {
        PrepareRelationsJob.readPathRelation(spark, inputRelationsPath).filter("dataInfo.deletedbyinference == false").groupByKey((MapFunction & Serializable)value -> value.getSource(), Encoders.STRING()).flatMapGroups((FlatMapGroupsFunction & Serializable)(key, values) -> Iterators.limit((Iterator)values, (int)100), Encoders.bean(SortableRelation.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static Dataset<SortableRelation> readPathRelation(SparkSession spark, String inputPath) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> (SortableRelation)OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class));
    }

    private static void prepareRelationsRDDFromPaths(SparkSession spark, String inputRelationsPath, String outputPath, int numPartitions) {
        JavaRDD rels = PrepareRelationsJob.readPathRelationRDD(spark, inputRelationsPath).repartition(numPartitions);
        RDD d = rels.filter((Function & Serializable)rel -> rel.getDataInfo().getDeletedbyinference() == false).mapToPair((PairFunction & Serializable)rel -> new Tuple2(rel, rel)).groupByKey((Partitioner)new RelationPartitioner(rels.getNumPartitions())).map((Function & Serializable)p -> Iterables.limit((Iterable)((Iterable)p._2()), (int)100)).flatMap((FlatMapFunction & Serializable)p -> p.iterator()).rdd();
        spark.createDataset(d, Encoders.bean(SortableRelation.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static JavaRDD<SortableRelation> readPathRelationRDD(SparkSession spark, String inputPath) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        return sc.textFile(inputPath).map((Function & Serializable)s -> (SortableRelation)OBJECT_MAPPER.readValue(s, SortableRelation.class));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

