/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.RelationComparator;
import eu.dnetlib.dhp.oa.provision.RelationList;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.SortableRelationKey;
import eu.dnetlib.dhp.oa.provision.utils.RelationPartitioner;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class PrepareRelationsJob {
    private static final Logger log = LoggerFactory.getLogger(PrepareRelationsJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    public static final int MAX_RELS = 100;
    public static final int DEFAULT_NUM_PARTITIONS = 3000;

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_prepare_relations.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputRelationsPath = parser.get("inputRelationsPath");
        log.info("inputRelationsPath: {}", (Object)inputRelationsPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        int relPartitions = Optional.ofNullable(parser.get("relPartitions")).map(Integer::valueOf).orElse(3000);
        log.info("relPartitions: {}", (Object)relPartitions);
        Set relationFilter = Optional.ofNullable(parser.get("relationFilter")).map(String::toLowerCase).map(s -> Sets.newHashSet((Iterable)Splitter.on((String)",").split((CharSequence)s))).orElse(new HashSet());
        log.info("relationFilter: {}", (Object)relationFilter);
        int sourceMaxRelations = Optional.ofNullable(parser.get("sourceMaxRelations")).map(Integer::valueOf).orElse(100);
        log.info("sourceMaxRelations: {}", (Object)sourceMaxRelations);
        int targetMaxRelations = Optional.ofNullable(parser.get("targetMaxRelations")).map(Integer::valueOf).orElse(100);
        log.info("targetMaxRelations: {}", (Object)targetMaxRelations);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PrepareRelationsJob.removeOutputDir(spark, outputPath);
            PrepareRelationsJob.prepareRelationsRDD(spark, inputRelationsPath, outputPath, relationFilter, sourceMaxRelations, targetMaxRelations, relPartitions);
        });
    }

    private static void prepareRelationsRDD(SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int sourceMaxRelations, int targetMaxRelations, int relPartitions) {
        JavaRDD rels = PrepareRelationsJob.readPathRelationRDD(spark, inputRelationsPath).filter((Function & Serializable)rel -> !rel.getSource().startsWith("unresolved") && !rel.getTarget().startsWith("unresolved")).filter((Function & Serializable)rel -> rel.getDataInfo().getDeletedbyinference() == false).filter((Function & Serializable)rel -> !relationFilter.contains(StringUtils.lowerCase((String)rel.getRelClass())));
        JavaRDD<Relation> pruned = PrepareRelationsJob.pruneRels(PrepareRelationsJob.pruneRels((JavaRDD<Relation>)rels, sourceMaxRelations, relPartitions, (Function<Relation, String>)((Function & Serializable)Relation::getSource)), targetMaxRelations, relPartitions, (Function<Relation, String>)((Function & Serializable)Relation::getTarget));
        spark.createDataset(pruned.rdd(), Encoders.bean(Relation.class)).repartition(relPartitions).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static JavaRDD<Relation> pruneRels(JavaRDD<Relation> rels, int maxRelations, int relPartitions, Function<Relation, String> idFn) {
        return rels.mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)SortableRelationKey.create(r, (String)idFn.call(r)), r)).repartitionAndSortWithinPartitions((Partitioner)new RelationPartitioner(relPartitions)).groupBy(Tuple2::_1).map(Tuple2::_2).map((Function & Serializable)t -> Iterables.limit((Iterable)t, (int)maxRelations)).flatMap(Iterable::iterator).map(Tuple2::_2);
    }

    private static void prepareRelationsDataset(SparkSession spark, String inputRelationsPath, String outputPath, Set<String> relationFilter, int maxRelations, int relPartitions) {
        spark.read().textFile(inputRelationsPath).repartition(relPartitions).map((MapFunction & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class), Encoders.kryo(Relation.class)).filter((FilterFunction & Serializable)rel -> rel.getDataInfo().getDeletedbyinference() == false).filter((FilterFunction & Serializable)rel -> !relationFilter.contains(rel.getRelClass())).groupByKey(Relation::getSource, Encoders.STRING()).agg(new RelationAggregator(maxRelations).toColumn()).flatMap((FlatMapFunction & Serializable)t -> Iterables.limit(((RelationList)t._2()).getRelations(), (int)maxRelations).iterator(), Encoders.bean(Relation.class)).repartition(relPartitions).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static JavaRDD<Relation> readPathRelationRDD(SparkSession spark, String inputPath) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        return sc.textFile(inputPath).map((Function & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    public static class RelationAggregator
    extends Aggregator<Relation, RelationList, RelationList> {
        private final int maxRelations;

        public RelationAggregator(int maxRelations) {
            this.maxRelations = maxRelations;
        }

        public RelationList zero() {
            return new RelationList();
        }

        public RelationList reduce(RelationList b, Relation a) {
            b.getRelations().add(a);
            return this.getSortableRelationList(b);
        }

        public RelationList merge(RelationList b1, RelationList b2) {
            b1.getRelations().addAll(b2.getRelations());
            return this.getSortableRelationList(b1);
        }

        public RelationList finish(RelationList r) {
            return this.getSortableRelationList(r);
        }

        private RelationList getSortableRelationList(RelationList b1) {
            RelationList sr = new RelationList();
            sr.setRelations(b1.getRelations().stream().limit(this.maxRelations).collect(Collectors.toCollection(() -> new PriorityQueue<Relation>(new RelationComparator()))));
            return sr;
        }

        public Encoder<RelationList> bufferEncoder() {
            return Encoders.kryo(RelationList.class);
        }

        public Encoder<RelationList> outputEncoder() {
            return Encoders.kryo(RelationList.class);
        }
    }
}

