/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelation;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.FlatMapGroupsFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class PrepareRelationsJob {
    private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final int MAX_RELS = 100;
    public static final int DEFAULT_NUM_PARTITIONS = 3000;

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputRelationsPath = parser.get("inputRelationsPath");
        log.info("inputRelationsPath: {}", (Object)inputRelationsPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        int relPartitions = Optional.ofNullable(parser.get("relPartitions")).map(Integer::valueOf).orElse(3000);
        log.info("relPartitions: {}", (Object)relPartitions);
        Set relationFilter = Optional.ofNullable(parser.get("relationFilter")).map(s -> Sets.newHashSet((Iterable)Splitter.on((String)",").split((CharSequence)s))).orElse(new HashSet());
        log.info("relationFilter: {}", (Object)relationFilter);
        int maxRelations = Optional.ofNullable(parser.get("maxRelations")).map(Integer::valueOf).orElse(100);
        log.info("maxRelations: {}", (Object)maxRelations);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PrepareRelationsJob.removeOutputDir(spark, outputPath);
            PrepareRelationsJob.prepareRelationsRDD(spark, inputRelationsPath, outputPath, relationFilter, relPartitions, maxRelations);
        });
    }

    private static void prepareRelations(SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations) {
        PrepareRelationsJob.readPathRelation(spark, inputRelationsPath).filter("dataInfo.deletedbyinference == false").filter((FilterFunction & Serializable)rel -> !relationFilter.contains(rel.getRelClass())).groupByKey((MapFunction & Serializable)value -> value.getSource(), Encoders.STRING()).flatMapGroups((FlatMapGroupsFunction & Serializable)(key, values) -> Iterators.limit((Iterator)values, (int)maxRelations), Encoders.bean(SortableRelation.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int relPartitions, int maxRelations) {
        JavaRDD rels = PrepareRelationsJob.readPathRelationRDD(spark, inputRelationsPath).repartition(relPartitions);
        RelationPartitioner partitioner = new RelationPartitioner(rels.getNumPartitions());
        RDD d = rels.filter((Function & Serializable)rel -> rel.getDataInfo().getDeletedbyinference() == false).filter((Function & Serializable)rel -> !relationFilter.contains(rel.getRelClass())).mapToPair((PairFunction & Serializable)rel -> new Tuple2(rel, rel)).groupByKey((Partitioner)partitioner).map((Function & Serializable)group -> Iterables.limit((Iterable)((Iterable)group._2()), (int)maxRelations)).flatMap((FlatMapFunction & Serializable)group -> group.iterator()).rdd();
        spark.createDataset(d, Encoders.bean(SortableRelation.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static Dataset<SortableRelation> readPathRelation(SparkSession spark, String inputPath) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> (SortableRelation)OBJECT_MAPPER.readValue(value, SortableRelation.class), Encoders.bean(SortableRelation.class));
    }

    private static JavaRDD<SortableRelation> readPathRelationRDD(SparkSession spark, String inputPath) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        return sc.textFile(inputPath).map((Function & Serializable)s -> (SortableRelation)OBJECT_MAPPER.readValue(s, SortableRelation.class));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

