/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.dedup;

import com.google.common.collect.Lists;
import eu.dnetlib.data.mapreduce.util.DedupUtils;
import eu.dnetlib.data.proto.RelTypeProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.distance.eval.ScoreResult;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import eu.dnetlib.pace.model.MapDocumentSerializer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupReducer
extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
    private static final Log log = LogFactory.getLog(DedupReducer.class);
    private DedupConfig dedupConf;
    private ImmutableBytesWritable ibw;

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        this.dedupConf = DedupConfig.load((String)context.getConfiguration().get("dedup.conf"));
        this.ibw = new ImmutableBytesWritable();
        log.info((Object)("dedup reduce phase \npace conf: " + this.dedupConf.toString()));
    }

    protected void reduce(Text key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        block5: {
            block4: {
                Queue<MapDocument> q = this.prepare(context, key, values);
                if (q.size() <= 1) break block4;
                log.info((Object)("reducing key: '" + key + "' records: " + q.size()));
                switch (TypeProtos.Type.valueOf((String)this.dedupConf.getWf().getEntityType())) {
                    case result: {
                        this.process(this.simplifyQueue(q, key.toString(), context), context);
                        break block5;
                    }
                    case organization: {
                        this.process(q, context);
                        break block5;
                    }
                    default: {
                        throw new IllegalArgumentException("process not implemented for type: " + this.dedupConf.getWf().getEntityType());
                    }
                }
            }
            context.getCounter(this.dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1L);
        }
    }

    private Queue<MapDocument> prepare(Reducer.Context context, Text key, Iterable<ImmutableBytesWritable> values) {
        PriorityQueue<MapDocument> queue = new PriorityQueue<MapDocument>(100, (Comparator<MapDocument>)new MapDocumentComparator(this.dedupConf.getWf().getOrderField()));
        HashSet<String> seen = new HashSet<String>();
        int queueMaxSize = this.dedupConf.getWf().getQueueMaxSize();
        boolean logged = false;
        for (ImmutableBytesWritable i : values) {
            if (queue.size() <= queueMaxSize) {
                MapDocument doc = MapDocumentSerializer.decode((byte[])i.copyBytes());
                String id = doc.getIdentifier();
                if (seen.contains(id)) continue;
                seen.add(id);
                queue.add(doc);
                continue;
            }
            if (logged) continue;
            context.getCounter("ngram size > " + queueMaxSize, "N").increment(1L);
            log.info((Object)("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'"));
            logged = true;
        }
        return queue;
    }

    private Queue<MapDocument> simplifyQueue(Queue<MapDocument> queue, String ngram, Reducer.Context context) {
        LinkedList<MapDocument> q = new LinkedList<MapDocument>();
        String fieldRef = "";
        ArrayList tempResults = Lists.newArrayList();
        while (!queue.isEmpty()) {
            String orderFieldName;
            MapDocument result = queue.remove();
            Field orderFieldValue = result.values(orderFieldName = this.dedupConf.getWf().getOrderField());
            if (!orderFieldValue.isEmpty()) {
                String field = NGramUtils.cleanupForOrdering((String)orderFieldValue.stringValue());
                if (field.equals(fieldRef)) {
                    tempResults.add(result);
                    continue;
                }
                this.populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
                tempResults.clear();
                tempResults.add(result);
                fieldRef = field;
                continue;
            }
            context.getCounter(this.dedupConf.getWf().getEntityType(), "missing " + this.dedupConf.getWf().getOrderField()).increment(1L);
        }
        this.populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
        return q;
    }

    private void populateSimplifiedQueue(Queue<MapDocument> q, List<MapDocument> tempResults, Reducer.Context context, String fieldRef, String ngram) {
        if (tempResults.size() < this.dedupConf.getWf().getGroupMaxSize()) {
            q.addAll(tempResults);
        } else {
            context.getCounter(this.dedupConf.getWf().getEntityType(), "Skipped records for count(" + this.dedupConf.getWf().getOrderField() + ") >= " + this.dedupConf.getWf().getGroupMaxSize()).increment((long)tempResults.size());
            log.info((Object)("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram));
        }
    }

    private void process(Queue<MapDocument> queue, Reducer.Context context) throws IOException, InterruptedException {
        PaceDocumentDistance algo = new PaceDocumentDistance();
        block0: while (!queue.isEmpty()) {
            MapDocument pivot = queue.remove();
            String idPivot = pivot.getIdentifier();
            Field fieldsPivot = pivot.values(this.dedupConf.getWf().getOrderField());
            String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
            if (fieldPivot == null) continue;
            int i = 0;
            for (MapDocument curr : queue) {
                String fieldCurr;
                String idCurr = curr.getIdentifier();
                if (this.mustSkip(idCurr)) {
                    context.getCounter(this.dedupConf.getWf().getEntityType(), "skip list").increment(1L);
                    continue block0;
                }
                if (i > this.dedupConf.getWf().getSlidingWindowSize()) continue block0;
                Field fieldsCurr = curr.values(this.dedupConf.getWf().getOrderField());
                String string = fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
                if (idCurr.equals(idPivot) || fieldCurr == null) continue;
                ScoreResult sr = this.similarity(algo, pivot, curr);
                this.emitOutput(sr, idPivot, idCurr, context);
                ++i;
            }
        }
    }

    private void emitOutput(ScoreResult sr, String idPivot, String idCurr, Reducer.Context context) throws IOException, InterruptedException {
        double d = sr.getScore();
        if (d >= this.dedupConf.getWf().getThreshold()) {
            this.writeSimilarity(context, idPivot, idCurr, d);
            context.getCounter(this.dedupConf.getWf().getEntityType(), RelTypeProtos.SubRelType.dedupSimilarity.toString() + " (x2)").increment(1L);
        } else {
            context.getCounter(this.dedupConf.getWf().getEntityType(), "d < " + this.dedupConf.getWf().getThreshold()).increment(1L);
        }
    }

    private ScoreResult similarity(PaceDocumentDistance algo, MapDocument a, MapDocument b) {
        try {
            return algo.between((Object)a, (Object)b, (Config)this.dedupConf);
        }
        catch (Throwable e) {
            log.error((Object)String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
            throw new IllegalArgumentException(e);
        }
    }

    private boolean mustSkip(String idPivot) {
        return this.dedupConf.getWf().getSkipList().contains(this.getNsPrefix(idPivot));
    }

    private String getNsPrefix(String id) {
        return StringUtils.substringBetween((String)id, (String)"|", (String)"::");
    }

    private void writeSimilarity(Reducer.Context context, String idPivot, String id, double d) throws IOException, InterruptedException {
        byte[] rowKey = Bytes.toBytes((String)idPivot);
        byte[] target = Bytes.toBytes((String)id);
        this.emitRel(context, rowKey, target, d);
        this.emitRel(context, target, rowKey, d);
    }

    private void emitRel(Reducer.Context context, byte[] from, byte[] to, double d) throws IOException, InterruptedException {
        TypeProtos.Type type = TypeProtos.Type.valueOf((String)this.dedupConf.getWf().getEntityType());
        Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes((String)""));
        put.setWriteToWAL(true);
        this.ibw.set(from);
        context.write((Object)this.ibw, (Object)put);
    }
}

