/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.oa.dedup.IdentifierComparator;
import eu.dnetlib.dhp.oa.dedup.model.Identifier;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class DedupRecordFactory {
    private static final int MAX_ACCEPTANCE_DATE = 20;

    private DedupRecordFactory() {
    }

    public static Dataset<OafEntity> createDedupRecord(SparkSession spark, DataInfo dataInfo, String mergeRelsInputPath, String entitiesInputPath, Class<OafEntity> clazz) {
        long ts = System.currentTimeMillis();
        Encoder beanEncoder = Encoders.bean(clazz);
        Encoder kryoEncoder = Encoders.kryo(clazz);
        Dataset entities = spark.read().schema(Encoders.bean(clazz).schema()).json(entitiesInputPath).as(beanEncoder).map((MapFunction & Serializable)entity -> new Tuple2((Object)entity.getId(), entity), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)kryoEncoder)).selectExpr(new String[]{"_1 AS id", "_2 AS kryoObject"});
        Dataset mergeRels = spark.read().load(mergeRelsInputPath).where("relClass == 'merges'").selectExpr(new String[]{"source as dedupId", "target as id"});
        return mergeRels.join(entities, (Seq)JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left").select("dedupId", new String[]{"id", "kryoObject"}).as(Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING(), (Encoder)kryoEncoder)).map((MapFunction & Serializable)t -> new DedupRecordReduceState((String)t._1(), (String)t._2(), (OafEntity)t._3()), Encoders.kryo(DedupRecordReduceState.class)).groupByKey(DedupRecordReduceState::getDedupId, Encoders.STRING()).reduceGroups((ReduceFunction & Serializable)(t1, t2) -> {
            if (t1.entity == null) {
                t2.aliases.addAll(t1.aliases);
                return t2;
            }
            if (t1.acceptanceDate.size() < 20) {
                t1.acceptanceDate.addAll(t2.acceptanceDate);
            }
            t1.aliases.addAll(t2.aliases);
            t1.entity = DedupRecordFactory.reduceEntity(t1.entity, t2.entity);
            return t1;
        }).flatMap((FlatMapFunction & Serializable)t -> {
            String dedupId = (String)t._1();
            DedupRecordReduceState agg = (DedupRecordReduceState)t._2();
            if (agg.acceptanceDate.size() >= 20) {
                return Collections.emptyIterator();
            }
            return Stream.concat(Stream.of(agg.getDedupId()).map(id -> DedupRecordFactory.createDedupOafEntity(id, agg.entity, dataInfo, ts)), agg.aliases.stream().map(id -> DedupRecordFactory.createMergedDedupAliasOafEntity(id, agg.entity, dataInfo, ts))).iterator();
        }, beanEncoder);
    }

    private static OafEntity createDedupOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
        try {
            OafEntity res = (OafEntity)BeanUtils.cloneBean((Object)base);
            res.setId(id);
            res.setDataInfo(dataInfo);
            res.setLastupdatetimestamp(Long.valueOf(ts));
            return res;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static OafEntity createMergedDedupAliasOafEntity(String id, OafEntity base, DataInfo dataInfo, long ts) {
        try {
            OafEntity res = DedupRecordFactory.createDedupOafEntity(id, base, dataInfo, ts);
            DataInfo ds = (DataInfo)BeanUtils.cloneBean((Object)dataInfo);
            ds.setDeletedbyinference(Boolean.valueOf(true));
            res.setDataInfo(ds);
            return res;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static OafEntity reduceEntity(OafEntity entity, OafEntity duplicate) {
        if (duplicate == null) {
            return entity;
        }
        int compare = new IdentifierComparator<OafEntity>().compare(Identifier.newInstance(entity), Identifier.newInstance(duplicate));
        if (compare > 0) {
            OafEntity swap = duplicate;
            duplicate = entity;
            entity = swap;
        }
        entity.mergeFrom(duplicate);
        if (ModelSupport.isSubClass((Oaf)duplicate, Result.class).booleanValue()) {
            Result re = (Result)entity;
            Result rd = (Result)duplicate;
            ArrayList<List> authors = new ArrayList<List>();
            if (re.getAuthor() != null) {
                authors.add(re.getAuthor());
            }
            if (rd.getAuthor() != null) {
                authors.add(rd.getAuthor());
            }
            re.setAuthor(AuthorMerger.merge(authors));
        }
        return entity;
    }

    public static <T extends OafEntity> T entityMerger(String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) {
        OafEntity base = (OafEntity)entities.next()._2();
        while (entities.hasNext()) {
            OafEntity duplicate = (OafEntity)entities.next()._2();
            if (duplicate == null) continue;
            base = DedupRecordFactory.reduceEntity(base, duplicate);
        }
        base.setId(id);
        base.setDataInfo(dataInfo);
        base.setLastupdatetimestamp(Long.valueOf(ts));
        return (T)base;
    }

    public static final class DedupRecordReduceState {
        public final String dedupId;
        public final ArrayList<String> aliases = new ArrayList();
        public final HashSet<String> acceptanceDate = new HashSet();
        public OafEntity entity;

        public DedupRecordReduceState(String dedupId, String id, OafEntity entity) {
            Result result;
            this.dedupId = dedupId;
            this.entity = entity;
            if (entity == null) {
                this.aliases.add(id);
            } else if (Result.class.isAssignableFrom(entity.getClass()) && (result = (Result)entity).getDateofacceptance() != null && StringUtils.isNotBlank((CharSequence)((CharSequence)result.getDateofacceptance().getValue()))) {
                this.acceptanceDate.add((String)result.getDateofacceptance().getValue());
            }
        }

        public String getDedupId() {
            return this.dedupId;
        }
    }
}

