/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.dedup;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.mapreduce.util.DedupUtils;
import eu.dnetlib.data.mapreduce.util.OafDecoder;
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
import eu.dnetlib.data.proto.DedupProtos;
import eu.dnetlib.data.proto.KindProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.data.transform.OafUtils;
import eu.dnetlib.data.transform.xml.AbstractDNetOafXsltFunctions;
import eu.dnetlib.pace.util.DedupConfig;
import eu.dnetlib.pace.util.DedupConfigLoader;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class DedupPersonReducer
extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
    private static final boolean WRITE_TO_WAL = false;
    private static final int MAX_Q_SIZE = 3000;
    private DedupConfig dedupConf;

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        this.dedupConf = DedupConfigLoader.load((String)context.getConfiguration().get("dedup.wf.conf"));
    }

    protected void reduce(Text key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        try {
            Queue<OafDecoder> q = this.prepare(key, values, context);
            if (q.size() > 1) {
                if (q.size() < 20) {
                    context.getCounter(this.dedupConf.getEntityType() + " root group size", this.lpad(q.size())).increment(1L);
                } else {
                    context.getCounter(this.dedupConf.getEntityType() + " root group size", "> 20").increment(1L);
                }
                String min = DedupPersonReducer.findMin(Iterables.transform(q, (Function)OafUtils.idDecoder()));
                if (min == null) {
                    context.getCounter(this.dedupConf.getEntityType(), "unable to find min").increment(1L);
                    return;
                }
                String rootId = DedupUtils.newId(min, this.dedupConf.getDedupRun());
                while (!q.isEmpty()) {
                    this.markDuplicate(context, rootId, q.remove());
                }
            } else {
                context.getCounter(this.dedupConf.getEntityType(), "1").increment(1L);
            }
        }
        catch (Throwable e) {
            System.out.println("GOT EX " + e);
            e.printStackTrace(System.err);
            context.getCounter(this.dedupConf.getEntityType(), e.getClass().toString()).increment(1L);
        }
    }

    private Queue<OafDecoder> prepare(Text key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) {
        LinkedList q = Lists.newLinkedList();
        for (OafDecoder decoder : Iterables.transform(values, OafHbaseUtils.decoder())) {
            q.add(decoder);
            if (q.size() < 3000) continue;
            context.getCounter("[" + key.toString() + "]", "size > 3000").increment(1L);
            break;
        }
        return q;
    }

    public static String findMin(Iterable<String> keys) {
        String min = (String)Iterables.getFirst(keys, null);
        for (String iq : keys) {
            if (min.compareTo(iq) <= 0) continue;
            min = iq;
        }
        return min;
    }

    private void markDuplicate(Reducer.Context context, String rootId, OafDecoder decoder) throws InvalidProtocolBufferException, IOException, InterruptedException {
        OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder((OafProtos.Oaf)decoder.getOaf());
        builder.getDataInfoBuilder().setDeletedbyinference(true).setInferenceprovenance("dedup person");
        OafProtos.Oaf oaf = builder.build();
        byte[] oafId = Bytes.toBytes((String)oaf.getEntity().getId());
        String entityName = this.dedupConf.getEntityType();
        this.emit(context, oafId, entityName, DedupUtils.BODY_B, oaf.toByteArray());
        context.getCounter(entityName, "marked as deleted").increment(1L);
        TypeProtos.Type entityType = TypeProtos.Type.valueOf((String)entityName);
        byte[] rowkey = Bytes.toBytes((String)rootId);
        String merges = DedupUtils.getDedupCF_merges(entityType);
        this.emit(context, rowkey, merges, oafId, this.buildRel(rowkey, oafId, DedupProtos.Dedup.RelName.merges));
        String mergedIn = DedupUtils.getDedupCF_mergedIn(entityType);
        this.emit(context, oafId, mergedIn, rowkey, this.buildRel(oafId, rowkey, DedupProtos.Dedup.RelName.isMergedIn));
        context.getCounter(entityName, merges).increment(1L);
        context.getCounter(entityName, mergedIn).increment(1L);
    }

    private void emit(Reducer.Context context, byte[] rowkey, String family, byte[] qualifier, byte[] value) throws IOException, InterruptedException {
        Put put = new Put(OafRowKeyDecoder.decode((byte[])rowkey).getKey().getBytes());
        put.setWriteToWAL(false);
        put.add(Bytes.toBytes((String)family), qualifier, value);
        context.write((Object)new ImmutableBytesWritable(rowkey), (Object)put);
    }

    private byte[] buildRel(byte[] from, byte[] to, DedupProtos.Dedup.RelName relClass) {
        OafProtos.OafRel.Builder oafRel = DedupUtils.getDedup(this.dedupConf, new String(from), new String(to), relClass);
        OafProtos.Oaf oaf = OafProtos.Oaf.newBuilder().setKind(KindProtos.Kind.relation).setTimestamp(System.currentTimeMillis()).setDataInfo(AbstractDNetOafXsltFunctions.getDataInfo(null, (String)"", (String)"0.8", (boolean)false, (boolean)true).setInferenceprovenance("dedup")).setRel(oafRel).build();
        return oaf.toByteArray();
    }

    private String lpad(int s) {
        return StringUtils.leftPad((String)String.valueOf(s), (int)String.valueOf(3000).length());
    }
}

