/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.dedup;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.data.mapreduce.hbase.HBaseTableUtils;
import eu.dnetlib.data.mapreduce.hbase.dedup.DedupPersonBean;
import eu.dnetlib.pace.model.PersonComparatorUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FindDedupCandidatePersonsReducer
extends TableReducer<Text, Text, ImmutableBytesWritable> {
    private static final boolean WRITE_TO_WAL = false;
    private static final int LIMIT = 5000;

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(Text key, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
        System.out.println("\nReducing key: " + key);
        HashSet candidates = Sets.newHashSet();
        HashMap resultIds = Maps.newHashMap();
        Queue<DedupPersonBean> queue = this.prepare(context, key, values);
        while (!queue.isEmpty()) {
            DedupPersonBean pivot = queue.remove();
            for (DedupPersonBean curr : queue) {
                if (!PersonComparatorUtils.areSimilar((String)pivot.getName(), (String)curr.getName())) continue;
                System.out.println("- Similar persons: [" + pivot.getName() + "] - [" + curr.getName() + "]");
                candidates.add(pivot.getId());
                candidates.add(curr.getId());
                this.collectResultIds(resultIds, pivot);
                this.collectResultIds(resultIds, curr);
            }
        }
        this.emitCandidates(context, candidates);
        this.emitResultCandidates(context, resultIds);
    }

    private void collectResultIds(Map<String, Set<String>> resultIds, DedupPersonBean person) {
        if (!resultIds.containsKey(person.getId())) {
            resultIds.put(person.getId(), new HashSet());
        }
        resultIds.get(person.getId()).addAll(person.getResults());
    }

    private Queue<DedupPersonBean> prepare(Reducer.Context context, Text key, Iterable<Text> values) {
        LinkedList<DedupPersonBean> queue = new LinkedList<DedupPersonBean>();
        for (Text i : values) {
            queue.add(DedupPersonBean.fromText(i));
            if (queue.size() <= 5000) continue;
            context.getCounter("Comparison list > 5000", "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1L);
            System.out.println("breaking out after limit (5000) for key '" + key);
            break;
        }
        return queue;
    }

    private void emitCandidates(Reducer.Context context, Set<String> candidates) throws IOException, InterruptedException {
        byte[] cf = Bytes.toBytes((String)HBaseTableUtils.VolatileColumnFamily.dedup.toString());
        byte[] col = Bytes.toBytes((String)"isCandidate");
        byte[] val = Bytes.toBytes((String)"");
        for (String s : candidates) {
            byte[] id = Bytes.toBytes((String)s);
            Put put = new Put(id).add(cf, col, val);
            put.setWriteToWAL(false);
            context.write((Object)new ImmutableBytesWritable(id), (Object)put);
        }
        context.getCounter(((Object)((Object)this)).getClass().getSimpleName(), "N. Put. (persons)").increment((long)candidates.size());
    }

    private void emitResultCandidates(Reducer.Context context, Map<String, Set<String>> resultIds) throws IOException, InterruptedException {
        byte[] cf = Bytes.toBytes((String)HBaseTableUtils.VolatileColumnFamily.dedupPerson.toString());
        byte[] val = Bytes.toBytes((String)"");
        for (String personId : resultIds.keySet()) {
            byte[] col = Bytes.toBytes((String)personId);
            for (String s : resultIds.get(personId)) {
                byte[] id = Bytes.toBytes((String)s);
                Put put = new Put(id).add(cf, col, val);
                put.setWriteToWAL(false);
                context.write((Object)new ImmutableBytesWritable(id), (Object)put);
            }
            context.getCounter(((Object)((Object)this)).getClass().getSimpleName(), "N. Put. (results)").increment((long)resultIds.get(personId).size());
        }
    }
}

