/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.Tuple2;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdjacencyListBuilderJob {
    private static final Logger log = LoggerFactory.getLogger(AdjacencyListBuilderJob.class);
    public static final int MAX_LINKS = 100;

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)AdjacencyListBuilderJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_build_adjacency_lists.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("inputPath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            AdjacencyListBuilderJob.removeOutputDir(spark, outputPath);
            AdjacencyListBuilderJob.createAdjacencyLists(spark, inputPath, outputPath);
        });
    }

    private static void createAdjacencyLists(SparkSession spark, String inputPath, String outputPath) {
        log.info("Reading joined entities from: {}", (Object)inputPath);
        spark.read().load(inputPath).as(Encoders.bean(EntityRelEntity.class)).groupByKey((MapFunction & Serializable)value -> value.getEntity().getId(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(key, values) -> {
            JoinedEntity j = new JoinedEntity();
            ArrayList<Tuple2> links = new ArrayList<Tuple2>();
            while (values.hasNext() && links.size() < 100) {
                EntityRelEntity curr = (EntityRelEntity)values.next();
                if (j.getEntity() == null) {
                    j.setEntity(curr.getEntity());
                }
                links.add(new Tuple2(curr.getRelation(), curr.getTarget()));
            }
            j.setLinks(links);
            return j;
        }, Encoders.bean(JoinedEntity.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

