/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.mapreduce.hbase.broker.AbstractEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.OAVersionEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.PIDEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.ProjectEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.PublicationDateEventFactory;
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
import eu.dnetlib.data.proto.FieldTypeProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.TypeProtos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.dom4j.DocumentException;

public class PrepareEnrichementReducer
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
    private static final int LIMIT = 1000;
    private Map<String, String> dsTypeMap = Maps.newHashMap();
    private Set<String> dsWhitelist = Sets.newHashSet();
    private Set<String> dsBlacklist = Sets.newHashSet();
    private Set<String> untrustedOaDsList = Sets.newHashSet();
    private Set<String> dsTypeWhitelist = Sets.newHashSet();

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        super.setup(context);
        System.out.println("LIMIT: 1000");
        this.dsWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.whitelist"));
        this.dsBlacklist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.blacklist"));
        this.dsTypeWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.type.whitelist"));
        this.untrustedOaDsList.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
        this.dsTypeMap = this.getDsTypeMap(context, this.dsTypeWhitelist);
        System.out.println("datasource whitelist: " + this.dsWhitelist);
        System.out.println("datasource blacklist: " + this.dsBlacklist);
        System.out.println("datasource OA list: " + this.untrustedOaDsList);
        System.out.println("datasource type whitelist: " + this.dsTypeWhitelist);
    }

    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        HashMap projects = Maps.newHashMap();
        ArrayList pubs = Lists.newArrayList();
        for (OafProtos.Oaf oaf : Iterables.transform((Iterable)Iterables.limit(values, (int)1000), this.oafDeserialiser())) {
            if (oaf.getEntity().getType() == TypeProtos.Type.project) {
                projects.put(oaf.getEntity().getId(), oaf);
                continue;
            }
            pubs.add(oaf);
        }
        try {
            this.generateEvents(pubs, projects, context);
        }
        catch (DocumentException e) {
            throw new RuntimeException(e);
        }
    }

    private Map<String, String> getDsTypeMap(Reducer.Context context, Set<String> dsTypeWhitelist) throws IOException {
        System.out.println("loading datasource typology mapping");
        HashMap dsTypeMap = Maps.newHashMap();
        Scan scan = new Scan();
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        fl.addFilter((Filter)new PrefixFilter(Bytes.toBytes((String)"10")));
        scan.setFilter((Filter)fl);
        scan.addFamily(Bytes.toBytes((String)"datasource"));
        String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
        System.out.println(String.format("table name: '%s'", tableName));
        try (HTable table = new HTable(context.getConfiguration(), tableName);
             ResultScanner res = table.getScanner(scan);){
            for (Result r : res) {
                byte[] b = r.getValue(Bytes.toBytes((String)"datasource"), Bytes.toBytes((String)"body"));
                if (b == null) continue;
                OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom((byte[])b);
                String dsId = StringUtils.substringAfter((String)oaf.getEntity().getId(), (String)"|");
                String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
                if (!dsTypeWhitelist.contains(dsType)) continue;
                System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
                dsTypeMap.put(dsId, dsType);
            }
        }
        System.out.println("datasource type map size: " + dsTypeMap.size());
        return dsTypeMap;
    }

    private void generateEvents(List<OafProtos.Oaf> pubs, Map<String, OafProtos.Oaf> projects, Reducer.Context context) throws IOException, InterruptedException, DocumentException {
        for (OafProtos.Oaf current : pubs) {
            String currentId = current.getEntity().getId();
            String currentDsId = StringUtils.substringAfter((String)OafHbaseUtils.getKey(current.getEntity().getCollectedfromList()), (String)"|");
            String currentDsType = this.dsTypeMap.get(currentDsId);
            if (StringUtils.isBlank((String)currentDsType) && !this.dsWhitelist.contains(currentDsId)) {
                context.getCounter("events skipped", "datasource type excluded").increment(1L);
                continue;
            }
            if (this.dsBlacklist.contains(currentDsId)) {
                context.getCounter("events skipped", "datasource blacklisted").increment(1L);
                continue;
            }
            for (OafProtos.Oaf other : pubs) {
                String otherId = other.getEntity().getId();
                if (currentId.equals(otherId)) continue;
                OafProtos.Oaf enrichedCurrent = this.addProjects(current, projects, Sets.newHashSet((Object[])new String[]{"sysimport:crosswalk:repository", "iis"}));
                PIDEventFactory.process(context, enrichedCurrent, other, RandomUtils.nextFloat());
                OAVersionEventFactory.process(context, enrichedCurrent, other, RandomUtils.nextFloat(), this.untrustedOaDsList);
                AbstractEventFactory.process(context, enrichedCurrent, other, RandomUtils.nextFloat());
                PublicationDateEventFactory.process(context, enrichedCurrent, other, RandomUtils.nextFloat());
                OafProtos.Oaf enrichedOther = this.addProjects(other, projects, Sets.newHashSet((Object[])new String[]{"sysimport:crosswalk:repository", "iis"}));
                ProjectEventFactory.process(context, enrichedCurrent, enrichedOther, RandomUtils.nextFloat());
            }
            OafProtos.Oaf enrichedCurrent = this.addProjects(current, projects, Sets.newHashSet((Object[])new String[]{"iis"}));
            ProjectEventFactory.process(context, enrichedCurrent, null, RandomUtils.nextFloat());
        }
    }

    private OafProtos.Oaf addProjects(OafProtos.Oaf current, Map<String, OafProtos.Oaf> projects, Set<String> allowedProvenance) {
        OafProtos.Oaf.Builder oafBuilder = OafProtos.Oaf.newBuilder((OafProtos.Oaf)current);
        for (OafProtos.OafRel.Builder rel : oafBuilder.getEntityBuilder().getCachedRelBuilderList()) {
            String provenance = ((FieldTypeProtos.KeyValue.Builder)Iterables.getOnlyElement((Iterable)rel.getCollectedfromBuilderList())).getKey();
            if (!(projects.containsKey(rel.getTarget()) & allowedProvenance.contains(provenance))) continue;
            rel.setCachedTarget(projects.get(rel.getTarget()).getEntity());
        }
        return oafBuilder.build();
    }

    private Function<ImmutableBytesWritable, OafProtos.Oaf> oafDeserialiser() {
        return new Function<ImmutableBytesWritable, OafProtos.Oaf>(){

            public OafProtos.Oaf apply(ImmutableBytesWritable input) {
                try {
                    return OafProtos.Oaf.parseFrom((byte[])input.copyBytes());
                }
                catch (InvalidProtocolBufferException e) {
                    throw new IllegalArgumentException(e);
                }
            }
        };
    }
}

