/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class CreateRelatedEntitiesJob_phase2 {
    private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)Objects.requireNonNull(CreateRelatedEntitiesJob_phase2.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
        log.info("inputRelatedEntitiesPath: {}", (Object)inputRelatedEntitiesPath);
        String inputEntityPath = parser.get("inputEntityPath");
        log.info("inputEntityPath: {}", (Object)inputEntityPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        int numPartitions = Integer.parseInt(parser.get("numPartitions"));
        log.info("numPartitions: {}", (Object)numPartitions);
        String graphTableClassName = parser.get("graphTableClassName");
        log.info("graphTableClassName: {}", (Object)graphTableClassName);
        Class<?> entityClazz = Class.forName(graphTableClassName);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            CreateRelatedEntitiesJob_phase2.removeOutputDir(spark, outputPath);
            CreateRelatedEntitiesJob_phase2.joinEntityWithRelatedEntities(spark, inputRelatedEntitiesPath, inputEntityPath, outputPath, numPartitions, entityClazz);
        });
    }

    private static <E extends OafEntity> void joinEntityWithRelatedEntities(SparkSession spark, String relatedEntitiesPath, String entityPath, String outputPath, int numPartitions, Class<E> entityClazz) {
        Dataset<Tuple2<String, E>> entities = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, entityPath, entityClazz);
        Dataset<Tuple2<String, RelatedEntityWrapper>> relatedEntities = CreateRelatedEntitiesJob_phase2.readRelatedEntities(spark, relatedEntitiesPath, entityClazz);
        TypedColumn aggregator = new AdjacencyListAggregator().toColumn();
        entities.joinWith(relatedEntities, entities.col("_1").equalTo((Object)relatedEntities.col("_1")), "left").map((MapFunction & Serializable)value -> {
            JoinedEntity<OafEntity> je = new JoinedEntity<OafEntity>((OafEntity)((Tuple2)value._1())._2());
            Optional.ofNullable(value._2()).map(Tuple2::_2).ifPresent(r -> je.getLinks().add((RelatedEntityWrapper)r));
            return je;
        }, Encoders.kryo(JoinedEntity.class)).groupByKey((MapFunction & Serializable)value -> value.getEntity().getId(), Encoders.STRING()).agg(aggregator).map((MapFunction & Serializable)value -> (JoinedEntity)value._2(), Encoders.kryo(JoinedEntity.class)).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static <E extends OafEntity> Dataset<Tuple2<String, RelatedEntityWrapper>> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath, Class<E> entityClazz) {
        log.info("Reading related entities from: {}", (Object)inputRelatedEntitiesPath);
        List paths = HdfsSupport.listFiles((String)inputRelatedEntitiesPath, (Configuration)spark.sparkContext().hadoopConfiguration());
        log.info("Found paths: {}", (Object)String.join((CharSequence)",", paths));
        String idPrefix = ModelSupport.getIdPrefix(entityClazz);
        return spark.read().load(CreateRelatedEntitiesJob_phase2.toSeq(paths)).as(Encoders.kryo(RelatedEntityWrapper.class)).filter((FilterFunction & Serializable)e -> e.getRelation().getSource().startsWith(idPrefix)).map((MapFunction & Serializable)value -> new Tuple2((Object)value.getRelation().getSource(), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(RelatedEntityWrapper.class)));
    }

    private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
        log.info("Reading Graph table from: {}", (Object)inputEntityPath);
        return spark.read().textFile(inputEntityPath).map((MapFunction & Serializable)value -> (OafEntity)OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)).filter("dataInfo.invisible == false").map((MapFunction & Serializable)e -> CreateRelatedEntitiesJob_phase2.pruneOutliers(entityClazz, e), Encoders.bean(entityClazz)).map((MapFunction & Serializable)e -> new Tuple2((Object)e.getId(), e), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(entityClazz)));
    }

    private static <E extends OafEntity> E pruneOutliers(Class<E> entityClazz, E e) {
        if (ModelSupport.isSubClass(entityClazz, Result.class).booleanValue()) {
            Result r = (Result)e;
            if (r.getExternalReference() != null) {
                List refs = r.getExternalReference().stream().limit(50L).collect(Collectors.toList());
                r.setExternalReference(refs);
            }
            if (r.getAuthor() != null) {
                ArrayList authors = Lists.newArrayList();
                for (Author a : r.getAuthor()) {
                    a.setFullname(StringUtils.left((String)a.getFullname(), (int)1000));
                    if (authors.size() >= 200 && !CreateRelatedEntitiesJob_phase2.hasORCID(a)) continue;
                    authors.add(a);
                }
                r.setAuthor((List)authors);
            }
            if (r.getDescription() != null) {
                List desc = r.getDescription().stream().filter(Objects::nonNull).map(d -> {
                    d.setValue((Object)StringUtils.left((String)((String)d.getValue()), (int)150000));
                    return d;
                }).collect(Collectors.toList());
                r.setDescription(desc);
            }
            if (r.getTitle() != null) {
                List titles = r.getTitle().stream().filter(Objects::nonNull).map(t -> {
                    t.setValue(StringUtils.left((String)t.getValue(), (int)5000));
                    return t;
                }).limit(10L).collect(Collectors.toList());
                r.setTitle(titles);
            }
        }
        return e;
    }

    private static boolean hasORCID(Author a) {
        return a.getPid() != null && a.getPid().stream().filter(Objects::nonNull).map(StructuredProperty::getQualifier).filter(Objects::nonNull).map(Qualifier::getClassid).filter(StringUtils::isNotBlank).anyMatch(c -> c.toLowerCase().contains("orcid"));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static Seq<String> toSeq(List<String> list) {
        return ((Iterator)JavaConverters.asScalaIteratorConverter(list.iterator()).asScala()).toSeq();
    }

    public static class AdjacencyListAggregator
    extends Aggregator<JoinedEntity, JoinedEntity, JoinedEntity> {
        public JoinedEntity zero() {
            return new JoinedEntity();
        }

        public JoinedEntity reduce(JoinedEntity b, JoinedEntity a) {
            return this.mergeAndGet(b, a);
        }

        private JoinedEntity mergeAndGet(JoinedEntity b, JoinedEntity a) {
            b.setEntity(Optional.ofNullable(a.getEntity()).orElse(Optional.ofNullable(b.getEntity()).orElse(null)));
            b.getLinks().addAll(a.getLinks());
            return b;
        }

        public JoinedEntity merge(JoinedEntity b, JoinedEntity a) {
            return this.mergeAndGet(b, a);
        }

        public JoinedEntity finish(JoinedEntity j) {
            return j;
        }

        public Encoder<JoinedEntity> bufferEncoder() {
            return Encoders.kryo(JoinedEntity.class);
        }

        public Encoder<JoinedEntity> outputEncoder() {
            return Encoders.kryo(JoinedEntity.class);
        }
    }
}

