/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.GeneratedMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.mapreduce.Algorithms;
import eu.dnetlib.data.mapreduce.hbase.broker.model.Event;
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.distance.eval.ScoreResult;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public abstract class AbstractEnrichmentReducer
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
    protected DedupConfig dedupConf;
    protected static final int LIMIT = 1000;
    protected static final int SCORE_DECIMALS = 2;
    protected Map<String, String> dsTypeMap = Maps.newHashMap();
    protected Set<String> dsWhitelist = Sets.newHashSet();
    protected Set<String> dsBlacklist = Sets.newHashSet();
    protected Set<String> untrustedOaDsList = Sets.newHashSet();
    protected Set<String> dsTypeWhitelist = Sets.newHashSet();
    protected Text tKey = new Text("");
    protected double scaleLB;

    protected abstract String counterGroup();

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        super.setup(context);
        System.out.println("LIMIT: 1000");
        this.dsWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.whitelist"));
        this.dsBlacklist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.blacklist"));
        this.dsTypeWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.type.whitelist"));
        this.untrustedOaDsList.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
        this.dsTypeMap = this.getDsTypeMap(context, this.dsTypeWhitelist);
        System.out.println("datasource whitelist: " + this.dsWhitelist);
        System.out.println("datasource blacklist: " + this.dsBlacklist);
        System.out.println("datasource OA list: " + this.untrustedOaDsList);
        System.out.println("datasource type whitelist: " + this.dsTypeWhitelist);
        String dedupConfJson = context.getConfiguration().get("dedup.conf");
        System.out.println("got dedup conf: " + dedupConfJson);
        this.dedupConf = DedupConfig.load((String)dedupConfJson);
        System.out.println("parsed dedup conf: " + this.dedupConf.toString());
        this.scaleLB = this.dedupConf.getWf().getThreshold() - 0.01;
    }

    protected Map<String, String> getDsTypeMap(Reducer.Context context, Set<String> dsTypeWhitelist) throws IOException {
        System.out.println("loading datasource typology mapping");
        HashMap dsTypeMap = Maps.newHashMap();
        Scan scan = new Scan();
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        fl.addFilter((Filter)new PrefixFilter(Bytes.toBytes((String)"10")));
        scan.setFilter((Filter)fl);
        scan.addFamily(Bytes.toBytes((String)"datasource"));
        String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
        System.out.println(String.format("table name: '%s'", tableName));
        try (HTable table = new HTable(context.getConfiguration(), tableName);
             ResultScanner res = table.getScanner(scan);){
            for (Result r : res) {
                byte[] b = r.getValue(Bytes.toBytes((String)"datasource"), Bytes.toBytes((String)"body"));
                if (b == null) continue;
                OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom((byte[])b);
                String dsId = StringUtils.substringAfter((String)oaf.getEntity().getId(), (String)"|");
                String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
                if (!dsTypeWhitelist.contains(dsType)) continue;
                dsTypeMap.put(dsId, dsType);
            }
        }
        System.out.println("datasource type map size: " + dsTypeMap.size());
        return dsTypeMap;
    }

    protected void emit(List<EventWrapper> events, Reducer.Context context) {
        events.stream().filter(Objects::nonNull).forEach(eventWrapper -> {
            try {
                Event event = eventWrapper.asBrokerEvent();
                String json = event.toJson();
                Text valueout = new Text(json);
                context.write((Object)this.tKey, (Object)valueout);
                context.getCounter(this.counterGroup(), eventWrapper.getCounterName()).increment(1L);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    protected float similarity(OafProtos.Oaf oa, OafProtos.Oaf ob) {
        MapDocument a = ProtoDocumentBuilder.newInstance((String)oa.getEntity().getId(), (GeneratedMessage)oa.getEntity(), (List)this.dedupConf.getPace().getModel());
        MapDocument b = ProtoDocumentBuilder.newInstance((String)ob.getEntity().getId(), (GeneratedMessage)ob.getEntity(), (List)this.dedupConf.getPace().getModel());
        ScoreResult sr = new PaceDocumentDistance().between((Object)a, (Object)b, (Config)this.dedupConf);
        float score = (float)Algorithms.scale(sr.getScore(), this.scaleLB, 1.0, 0.0, 1.0);
        return MathUtils.round((float)score, (int)2, (int)5);
    }

    public static Function<ImmutableBytesWritable, OafProtos.Oaf> oafDeserialiser() {
        return p -> {
            try {
                return OafProtos.Oaf.parseFrom((byte[])p.copyBytes());
            }
            catch (InvalidProtocolBufferException e) {
                throw new IllegalArgumentException(e);
            }
        };
    }

    public static OafProtos.Oaf toOaf(ImmutableBytesWritable p) {
        try {
            return OafProtos.Oaf.parseFrom((byte[])p.copyBytes());
        }
        catch (InvalidProtocolBufferException e) {
            throw new IllegalArgumentException(e);
        }
    }
}

