/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.dedup;

import com.google.common.collect.Lists;
import eu.dnetlib.data.mapreduce.util.DedupUtils;
import eu.dnetlib.data.proto.RelTypeProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.DynConf;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.model.FieldList;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import eu.dnetlib.pace.model.MapDocumentSerializer;
import eu.dnetlib.pace.util.DedupConfig;
import eu.dnetlib.pace.util.DedupConfigLoader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer
extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
    private static final boolean WRITE_TO_WAL = false;
    private Config paceConf;
    private DedupConfig dedupConf;
    private ImmutableBytesWritable ibw;

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        this.paceConf = DynConf.load((String)context.getConfiguration().get("dedup.pace.conf"));
        this.dedupConf = DedupConfigLoader.load((String)context.getConfiguration().get("dedup.wf.conf"));
        this.ibw = new ImmutableBytesWritable();
        System.out.println("dedup reduce phase \npace conf: " + this.paceConf.fields() + "\nwf conf: " + this.dedupConf.toString());
    }

    protected void reduce(Text key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        System.out.println("\nReducing key: '" + key + "'");
        Queue<MapDocument> q = this.prepare(context, key, values);
        switch (TypeProtos.Type.valueOf((String)this.dedupConf.getEntityType())) {
            case person: {
                this.process(q, context);
                break;
            }
            case result: {
                this.process(this.simplifyQueue(q, key.toString(), context), context);
                break;
            }
            case organization: {
                this.process(q, context);
                break;
            }
            default: {
                throw new IllegalArgumentException("dedup not implemented for type: " + this.dedupConf.getEntityType());
            }
        }
    }

    private Queue<MapDocument> prepare(Reducer.Context context, Text key, Iterable<ImmutableBytesWritable> values) {
        PriorityQueue<MapDocument> queue = new PriorityQueue<MapDocument>(100, (Comparator<MapDocument>)new MapDocumentComparator(this.dedupConf.getOrderField()));
        HashSet<String> seen = new HashSet<String>();
        for (ImmutableBytesWritable i : values) {
            MapDocument doc = MapDocumentSerializer.decode((byte[])i.copyBytes());
            String id = doc.getIdentifier();
            if (!seen.contains(id)) {
                seen.add(id);
                queue.add(doc);
            }
            if (queue.size() <= this.dedupConf.getQueueMaxSize()) continue;
            context.getCounter("ngram size > " + this.dedupConf.getQueueMaxSize(), "N").increment(1L);
            System.out.println("breaking out after limit (" + this.dedupConf.getQueueMaxSize() + ") for ngram '" + key);
            break;
        }
        return queue;
    }

    private Queue<MapDocument> simplifyQueue(Queue<MapDocument> queue, String ngram, Reducer.Context context) {
        LinkedList<MapDocument> q = new LinkedList<MapDocument>();
        String fieldRef = "";
        ArrayList tempResults = Lists.newArrayList();
        while (!queue.isEmpty()) {
            MapDocument result = queue.remove();
            if (!result.values(this.dedupConf.getOrderField()).isEmpty()) {
                String field = NGramUtils.cleanupForOrdering((String)result.values(this.dedupConf.getOrderField()).stringValue());
                if (field.equals(fieldRef)) {
                    tempResults.add(result);
                    continue;
                }
                this.populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
                tempResults.clear();
                tempResults.add(result);
                fieldRef = field;
                continue;
            }
            context.getCounter(this.dedupConf.getEntityType(), "missing " + this.dedupConf.getOrderField()).increment(1L);
        }
        this.populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
        return q;
    }

    private void populateSimplifiedQueue(Queue<MapDocument> q, List<MapDocument> tempResults, Reducer.Context context, String fieldRef, String ngram) {
        if (tempResults.size() < this.dedupConf.getGroupMaxSize()) {
            q.addAll(tempResults);
        } else {
            context.getCounter(this.dedupConf.getEntityType(), "Skipped records for count(" + this.dedupConf.getOrderField() + ") >= " + this.dedupConf.getGroupMaxSize()).increment((long)tempResults.size());
            System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
        }
    }

    private void process(Queue<MapDocument> queue, Reducer.Context context) throws IOException, InterruptedException {
        PaceDocumentDistance algo = new PaceDocumentDistance();
        block0: while (!queue.isEmpty()) {
            MapDocument pivot = queue.remove();
            String idPivot = pivot.getIdentifier();
            FieldList fieldsPivot = pivot.values(this.dedupConf.getOrderField());
            String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
            if (fieldPivot == null) continue;
            int i = 0;
            for (MapDocument curr : queue) {
                String fieldCurr;
                String idCurr = curr.getIdentifier();
                if (this.mustSkip(idCurr)) {
                    context.getCounter(this.dedupConf.getEntityType(), "skip list").increment(1L);
                    continue block0;
                }
                if (i > this.dedupConf.getSlidingWindowSize()) continue block0;
                FieldList fieldsCurr = curr.values(this.dedupConf.getOrderField());
                String string = fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
                if (idCurr.equals(idPivot) || fieldCurr == null) continue;
                double d = algo.between((Object)pivot, (Object)curr, this.paceConf);
                if (d >= this.dedupConf.getThreshold()) {
                    this.writeSimilarity(context, idPivot, idCurr);
                    context.getCounter(this.dedupConf.getEntityType(), RelTypeProtos.SubRelType.dedupSimilarity.toString() + " (x2)").increment(1L);
                } else {
                    context.getCounter(this.dedupConf.getEntityType(), "d < " + this.dedupConf.getThreshold()).increment(1L);
                }
                ++i;
            }
        }
    }

    private boolean mustSkip(String idPivot) {
        return this.dedupConf.getSkipList().contains(this.getNsPrefix(idPivot));
    }

    private String getNsPrefix(String id) {
        return StringUtils.substringBetween((String)id, (String)"|", (String)"::");
    }

    private void writeSimilarity(Reducer.Context context, String idPivot, String id) throws IOException, InterruptedException {
        byte[] rowKey = Bytes.toBytes((String)idPivot);
        byte[] target = Bytes.toBytes((String)id);
        this.emitRel(context, rowKey, target);
        this.emitRel(context, target, rowKey);
    }

    private void emitRel(Reducer.Context context, byte[] from, byte[] to) throws IOException, InterruptedException {
        Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(TypeProtos.Type.valueOf((String)this.dedupConf.getEntityType())), to, Bytes.toBytes((String)""));
        put.setWriteToWAL(false);
        this.ibw.set(from);
        context.write((Object)this.ibw, (Object)put);
    }
}

