/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.oa.dedup.DatePicker;
import eu.dnetlib.dhp.oa.merge.AuthorMerger;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

public class DedupRecordFactory {
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    private DedupRecordFactory() {
    }

    public static <T extends OafEntity> Dataset<T> createDedupRecord(SparkSession spark, DataInfo dataInfo, String mergeRelsInputPath, String entitiesInputPath, Class<T> clazz) {
        long ts = System.currentTimeMillis();
        Dataset entities = spark.read().textFile(entitiesInputPath).map((MapFunction & Serializable)it -> {
            OafEntity entity = (OafEntity)OBJECT_MAPPER.readValue(it, clazz);
            return new Tuple2((Object)entity.getId(), (Object)entity);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz)));
        Dataset mergeRels = spark.read().load(mergeRelsInputPath).as(Encoders.bean(Relation.class)).where("relClass == 'merges'").map((MapFunction & Serializable)r -> new Tuple2((Object)r.getSource(), (Object)r.getTarget()), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        return mergeRels.joinWith(entities, mergeRels.col("_2").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)value -> new Tuple2(((Tuple2)value._1())._1(), ((Tuple2)value._2())._2()), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(clazz))).groupByKey(Tuple2::_1, Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(key, values) -> DedupRecordFactory.entityMerger(key, values, ts, dataInfo, clazz), Encoders.bean(clazz));
    }

    public static <T extends OafEntity> T entityMerger(String id, Iterator<Tuple2<String, T>> entities, long ts, DataInfo dataInfo, Class<T> clazz) throws IllegalAccessException, InstantiationException {
        OafEntity entity = (OafEntity)clazz.newInstance();
        ArrayList dates = Lists.newArrayList();
        ArrayList authors = Lists.newArrayList();
        entities.forEachRemaining(t -> {
            OafEntity duplicate = (OafEntity)t._2();
            entity.mergeFrom(duplicate);
            if (ModelSupport.isSubClass((Oaf)duplicate, Result.class).booleanValue()) {
                Result r1 = (Result)duplicate;
                if (r1.getAuthor() != null && !r1.getAuthor().isEmpty()) {
                    authors.add(r1.getAuthor());
                }
                if (r1.getDateofacceptance() != null) {
                    dates.add(r1.getDateofacceptance().getValue());
                }
            }
        });
        if (ModelSupport.isSubClass((Oaf)entity, Result.class).booleanValue()) {
            ((Result)entity).setDateofacceptance(DatePicker.pick(dates));
            ((Result)entity).setAuthor(AuthorMerger.merge((List)authors));
        }
        entity.setId(id);
        entity.setLastupdatetimestamp(Long.valueOf(ts));
        entity.setDataInfo(dataInfo);
        return (T)entity;
    }
}

