/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib;

import eu.dnetlib.DocumentsBlock;
import eu.dnetlib.Utility;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.utils.PaceUtils;
import eu.dnetlib.reporter.SparkBlockProcessor;
import eu.dnetlib.reporter.SparkReporter;
import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;

public class SparkTest {
    public static void main(String[] args) throws IOException {
        String inputSpacePath = args[0];
        String dedupConfigPath = args[1];
        String groupsPath = args[2] + "_groups";
        String outputPath = args[2] + "_output";
        SparkSession spark = SparkSession.builder().appName("Deduplication").master("yarn").getOrCreate();
        JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
        JavaRDD<String> dataRDD = Utility.loadDataFromHDFS(inputSpacePath, context);
        DedupConfig config = Utility.loadConfigFromHDFS(dedupConfigPath);
        Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config, context.sc());
        JavaPairRDD mapDocs = dataRDD.mapToPair((PairFunction & Serializable)it -> {
            MapDocument mapDocument = PaceUtils.asMapDocument(config, it);
            return new Tuple2((Object)mapDocument.getIdentifier(), (Object)mapDocument);
        });
        RDD vertexes = mapDocs.mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)((String)t._1()).hashCode(), t._2())).rdd();
        JavaPairRDD blocks = mapDocs.reduceByKey((Function2 & Serializable)(a, b) -> a).flatMapToPair((PairFlatMapFunction & Serializable)a -> {
            MapDocument currentDocument = (MapDocument)a._2();
            return Utility.getGroupingKeys(config, currentDocument).stream().map(it -> new Tuple2(it, (Object)currentDocument)).collect(Collectors.toList()).iterator();
        }).groupByKey();
        Utility.deleteIfExists(groupsPath);
        blocks.map((Function & Serializable)group -> new DocumentsBlock((String)group._1(), (Iterable)group._2())).saveAsTextFile(groupsPath);
        JavaPairRDD relationRDD = blocks.flatMapToPair((PairFlatMapFunction & Serializable)it -> {
            SparkReporter reporter = new SparkReporter();
            new SparkBlockProcessor(config).process((String)it._1(), (Iterable)it._2(), reporter, accumulators);
            return reporter.getReport().iterator();
        });
        RDD edgeRdd = relationRDD.map((Function & Serializable)it -> new Edge((long)((String)it._1()).hashCode(), (long)((String)it._2()).hashCode(), (Object)"similarTo")).rdd();
        JavaRDD ccs = GraphProcessor.findCCs((RDD<Tuple2<Object, MapDocument>>)vertexes, (RDD<Edge<String>>)edgeRdd, 20).toJavaRDD();
        Utility.deleteIfExists(outputPath);
        ccs.saveAsTextFile(outputPath);
        JavaRDD connectedComponents = ccs.filter((Function & Serializable)cc -> cc.getDocs().size() > 1);
        JavaRDD nonDeduplicated = ccs.filter((Function & Serializable)cc -> cc.getDocs().size() == 1);
        System.out.println("Non duplicates: " + nonDeduplicated.count());
        System.out.println("Duplicates: " + connectedComponents.flatMap((FlatMapFunction & Serializable)cc -> cc.getDocs().iterator()).count());
        System.out.println("Connected Components: " + connectedComponents.count());
        accumulators.forEach((name, acc) -> System.out.println(name + " -> " + acc.value()));
    }
}

