/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.Predicate;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupEntitiesSparkJob {
    private static final Logger log = LoggerFactory.getLogger(GroupEntitiesSparkJob.class);
    private static final String ID_JPATH = "$.id";
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)GroupEntitiesSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/group_graph_entities_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String graphInputPath = parser.get("graphInputPath");
        log.info("graphInputPath: {}", (Object)graphInputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            HdfsSupport.remove((String)outputPath, (Configuration)spark.sparkContext().hadoopConfiguration());
            GroupEntitiesSparkJob.groupEntities(spark, graphInputPath, outputPath);
        });
    }

    private static void groupEntities(SparkSession spark, String inputPath, String outputPath) {
        TypedColumn aggregator = new GroupingAggregator().toColumn();
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        spark.read().textFile(DHPUtils.toSeq(GroupEntitiesSparkJob.listEntityPaths(inputPath, sc))).map((MapFunction & Serializable)s -> GroupEntitiesSparkJob.parseOaf(s), Encoders.kryo(OafEntity.class)).filter((FilterFunction & Serializable)e -> StringUtils.isNotBlank((CharSequence)((CharSequence)ModelSupport.idFn().apply(e)))).groupByKey((MapFunction & Serializable)oaf -> (String)ModelSupport.idFn().apply(oaf), Encoders.STRING()).agg(aggregator).map((MapFunction & Serializable)t -> ((OafEntity)t._2()).getClass().getName() + "|" + OBJECT_MAPPER.writeValueAsString(t._2()), Encoders.STRING()).write().option("compression", "gzip").mode(SaveMode.Overwrite).text(outputPath);
    }

    private static OafEntity parseOaf(String s) {
        DocumentContext dc = JsonPath.parse((String)s, (com.jayway.jsonpath.Configuration)com.jayway.jsonpath.Configuration.defaultConfiguration().addOptions(new Option[]{Option.SUPPRESS_EXCEPTIONS}));
        String id = (String)dc.read(ID_JPATH, new Predicate[0]);
        if (StringUtils.isNotBlank((CharSequence)id)) {
            String prefix;
            switch (prefix = StringUtils.substringBefore((String)id, (String)"|")) {
                case "10": {
                    return GroupEntitiesSparkJob.parse(s, Datasource.class);
                }
                case "20": {
                    return GroupEntitiesSparkJob.parse(s, Organization.class);
                }
                case "40": {
                    return GroupEntitiesSparkJob.parse(s, Project.class);
                }
                case "50": {
                    String resultType;
                    switch (resultType = (String)dc.read("$.resulttype.classid", new Predicate[0])) {
                        case "publication": {
                            return GroupEntitiesSparkJob.parse(s, Publication.class);
                        }
                        case "dataset": {
                            return GroupEntitiesSparkJob.parse(s, Dataset.class);
                        }
                        case "software": {
                            return GroupEntitiesSparkJob.parse(s, Software.class);
                        }
                        case "other": {
                            return GroupEntitiesSparkJob.parse(s, OtherResearchProduct.class);
                        }
                    }
                    throw new IllegalArgumentException(String.format("invalid resultType: '%s'", resultType));
                }
            }
            throw new IllegalArgumentException(String.format("invalid id prefix: '%s'", prefix));
        }
        throw new IllegalArgumentException(String.format("invalid oaf: '%s'", s));
    }

    private static <T extends OafEntity> OafEntity parse(String s, Class<T> clazz) {
        try {
            return (OafEntity)OBJECT_MAPPER.readValue(s, clazz);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static List<String> listEntityPaths(String inputPath, JavaSparkContext sc) {
        return HdfsSupport.listFiles((String)inputPath, (Configuration)sc.hadoopConfiguration()).stream().filter(f -> !f.toLowerCase().contains("relation")).collect(Collectors.toList());
    }

    public static class GroupingAggregator
    extends Aggregator<OafEntity, OafEntity, OafEntity> {
        public OafEntity zero() {
            return null;
        }

        public OafEntity reduce(OafEntity b, OafEntity a) {
            return this.mergeAndGet(b, a);
        }

        private OafEntity mergeAndGet(OafEntity b, OafEntity a) {
            if (Objects.nonNull(a) && Objects.nonNull(b)) {
                return OafMapperUtils.mergeEntities((OafEntity)b, (OafEntity)a);
            }
            return Objects.isNull(a) ? b : a;
        }

        public OafEntity merge(OafEntity b, OafEntity a) {
            return this.mergeAndGet(b, a);
        }

        public OafEntity finish(OafEntity j) {
            return j;
        }

        public Encoder<OafEntity> bufferEncoder() {
            return Encoders.kryo(OafEntity.class);
        }

        public Encoder<OafEntity> outputEncoder() {
            return Encoders.kryo(OafEntity.class);
        }
    }
}

