/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib;

import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessor;
import eu.dnetlib.pace.util.MapDocumentUtil;
import eu.dnetlib.pace.util.Reporter;
import eu.dnetlib.pace.utils.Utility;
import eu.dnetlib.reporter.SparkReporter;
import eu.dnetlib.support.ConnectedComponent;
import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;

public class Deduper
implements scala.Serializable {
    private static final Log log = LogFactory.getLog(Deduper.class);

    public static JavaRDD<ConnectedComponent> dedup(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
        Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config, context.sc());
        JavaPairRDD<String, MapDocument> mapDocs = Deduper.mapToVertexes(context, entities, config);
        RDD vertexes = mapDocs.mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)Utility.getHashcode((String)t._1()), t._2())).rdd();
        JavaPairRDD<String, Iterable<MapDocument>> blocks = Deduper.createBlocks(context, mapDocs, config);
        JavaPairRDD<String, String> relationRDD = Deduper.computeRelations(context, blocks, config);
        System.out.println("Number of relations = " + relationRDD.distinct().count());
        RDD edgeRdd = relationRDD.map((Function & Serializable)it -> new Edge(Utility.getHashcode((String)it._1()), Utility.getHashcode((String)it._2()), (Object)"isSimilarTo")).rdd();
        accumulators.forEach((name, acc) -> log.info((Object)(name + " -> " + acc.value())));
        return GraphProcessor.findCCs((RDD<Tuple2<Object, MapDocument>>)vertexes, (RDD<Edge<String>>)edgeRdd, config.getWf().getMaxIterations()).toJavaRDD();
    }

    public static JavaPairRDD<String, String> computeRelations(JavaSparkContext context, JavaPairRDD<String, Iterable<MapDocument>> blocks, DedupConfig config) {
        Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config, context.sc());
        return blocks.flatMapToPair((PairFlatMapFunction & Serializable)it -> {
            SparkReporter reporter = new SparkReporter(accumulators);
            new BlockProcessor(config).process((String)it._1(), (Iterable)it._2(), (Reporter)reporter);
            return reporter.getRelations().iterator();
        });
    }

    public static JavaPairRDD<String, Iterable<MapDocument>> createBlocks(JavaSparkContext context, JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
        return mapDocs.reduceByKey((Function2 & Serializable)(a, b) -> a).flatMapToPair((PairFlatMapFunction & Serializable)a -> {
            MapDocument currentDocument = (MapDocument)a._2();
            return Utility.getGroupingKeys(config, currentDocument).stream().map(it -> new Tuple2(it, (Object)currentDocument)).collect(Collectors.toList()).iterator();
        }).groupByKey();
    }

    public static JavaPairRDD<String, MapDocument> mapToVertexes(JavaSparkContext context, JavaRDD<String> entities, DedupConfig config) {
        return entities.mapToPair((PairFunction & Serializable)it -> {
            MapDocument mapDocument = MapDocumentUtil.asMapDocumentWithJPath((DedupConfig)config, (String)it);
            return new Tuple2((Object)mapDocument.getIdentifier(), (Object)mapDocument);
        });
    }
}

