/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib;

import com.google.common.hash.Hashing;
import eu.dnetlib.graph.GraphProcessor;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.util.BlockProcessorForTesting;
import eu.dnetlib.pace.util.MapDocumentUtil;
import eu.dnetlib.pace.util.Reporter;
import eu.dnetlib.pace.utils.Utility;
import eu.dnetlib.reporter.SparkReporter;
import eu.dnetlib.support.Block;
import eu.dnetlib.support.ConnectedComponent;
import eu.dnetlib.support.Relation;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.graphx.Edge;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import scala.Tuple2;

public class Deduper
implements scala.Serializable {
    private static final Log log = LogFactory.getLog(Deduper.class);

    public static JavaPairRDD<String, Block> createSortedBlocks(JavaPairRDD<String, MapDocument> mapDocs, DedupConfig config) {
        String of = config.getWf().getOrderField();
        int maxQueueSize = config.getWf().getGroupMaxSize();
        return mapDocs.reduceByKey((Function2 & Serializable)(a, b) -> a).map(Tuple2::_2).flatMap((FlatMapFunction & Serializable)a -> Utility.getGroupingKeys(config, a).stream().map(it -> Block.from(it, a)).collect(Collectors.toList()).iterator()).mapToPair((PairFunction & Serializable)block -> new Tuple2((Object)block.getKey(), block)).reduceByKey((Function2 & Serializable)(b1, b2) -> Block.from(b1, b2, of, maxQueueSize));
    }

    public static Iterator<Tuple2<String, String>> ccToMergeRel(ConnectedComponent cc, DedupConfig dedupConf) {
        return cc.getDocs().stream().flatMap(id -> {
            ArrayList<Tuple2> tmp = new ArrayList<Tuple2>();
            tmp.add(new Tuple2((Object)cc.getCcId(), id));
            return tmp.stream();
        }).iterator();
    }

    public static long hash(String id) {
        return Hashing.murmur3_128().hashString((CharSequence)id, Charset.defaultCharset()).asLong();
    }

    public static ConnectedComponent entityMerger(String key, Iterator<String> values) {
        ConnectedComponent cc = new ConnectedComponent();
        cc.setCcId(key);
        cc.setDocs(StreamSupport.stream(Spliterators.spliteratorUnknownSize(values, 16), false).collect(Collectors.toCollection(HashSet::new)));
        return cc;
    }

    public static JavaRDD<Relation> computeRelations(JavaSparkContext context, JavaPairRDD<String, Block> blocks, DedupConfig config, boolean useTree) {
        Map<String, LongAccumulator> accumulators = Utility.constructAccumulator(config, context.sc());
        return blocks.flatMapToPair((PairFlatMapFunction & Serializable)it -> {
            SparkReporter reporter = new SparkReporter(accumulators);
            new BlockProcessorForTesting(config).processSortedBlock((String)it._1(), ((Block)it._2()).getDocuments(), (Reporter)reporter, useTree);
            return reporter.getRelations().iterator();
        }).mapToPair((PairFunction & Serializable)it -> new Tuple2((Object)((String)it._1() + (String)it._2()), (Object)new Relation((String)it._1(), (String)it._2(), "simRel"))).reduceByKey((Function2 & Serializable)(a, b) -> a).map(Tuple2::_2);
    }

    public static void createSimRels(DedupConfig dedupConf, SparkSession spark, String entitiesPath, String simRelsPath, boolean useTree) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaPairRDD mapDocuments = sc.textFile(entitiesPath).mapToPair((PairFunction & Serializable)s -> {
            MapDocument d = MapDocumentUtil.asMapDocumentWithJPath((DedupConfig)dedupConf, (String)s);
            return new Tuple2((Object)d.getIdentifier(), (Object)d);
        });
        JavaPairRDD<String, Block> blocks = Deduper.createSortedBlocks((JavaPairRDD<String, MapDocument>)mapDocuments, dedupConf);
        JavaRDD<Relation> relations = Deduper.computeRelations(sc, blocks, dedupConf, useTree);
        spark.createDataset(relations.rdd(), Encoders.bean(Relation.class)).write().mode(SaveMode.Overwrite).save(simRelsPath);
    }

    public static void createMergeRels(DedupConfig dedupConf, String entitiesPath, String mergeRelsPath, String simRelsPath, SparkSession spark) {
        int maxIterations = dedupConf.getWf().getMaxIterations();
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaPairRDD vertexes = sc.textFile(entitiesPath).map((Function & Serializable)s -> MapDocumentUtil.getJPathString((String)dedupConf.getWf().getIdPath(), (String)s)).mapToPair((PairFunction & Serializable)s -> new Tuple2((Object)Deduper.hash(s), s));
        RDD edgeRdd = spark.read().load(simRelsPath).as(Encoders.bean(Relation.class)).javaRDD().map(Relation::toEdgeRdd).rdd();
        JavaRDD ccs = GraphProcessor.findCCs((RDD<Tuple2<Object, String>>)vertexes.rdd(), (RDD<Edge<String>>)edgeRdd, maxIterations).toJavaRDD();
        JavaRDD mergeRel = ccs.filter((Function & Serializable)k -> k.getDocs().size() > 1).flatMap((FlatMapFunction & Serializable)cc -> Deduper.ccToMergeRel(cc, dedupConf)).map((Function & Serializable)it -> new Relation((String)it._1(), (String)it._2(), "mergeRel"));
        Dataset mergeRels = spark.createDataset(mergeRel.rdd(), Encoders.bean(Relation.class));
        mergeRels.write().mode(SaveMode.Overwrite).parquet(mergeRelsPath);
    }

    public static void createDedupEntity(DedupConfig dedupConf, String mergeRelsPath, String entitiesPath, SparkSession spark, String dedupEntityPath) {
        JavaPairRDD entities = spark.read().textFile(entitiesPath).map((MapFunction & Serializable)it -> new Tuple2((Object)MapDocumentUtil.getJPathString((String)dedupConf.getWf().getIdPath(), (String)it), it), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).toJavaRDD().mapToPair((PairFunction & Serializable)t -> t);
        JavaPairRDD mergeRels = spark.read().load(mergeRelsPath).as(Encoders.bean(Relation.class)).toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getTarget(), r));
        JavaRDD dedupEntities = mergeRels.join(entities).mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)((Relation)((Tuple2)t._2())._1()).getSource(), ((Tuple2)t._2())._2())).groupByKey().map((Function & Serializable)t -> Deduper.entityMerger((String)t._1(), ((Iterable)t._2()).iterator()));
        dedupEntities.saveAsTextFile(dedupEntityPath);
    }
}

