/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.sx.graph;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIDataset;
import eu.dnetlib.dhp.schema.scholexplorer.DLIPublication;
import eu.dnetlib.dhp.schema.scholexplorer.DLIRelation;
import eu.dnetlib.dhp.schema.scholexplorer.DLIUnknown;
import eu.dnetlib.dhp.sx.graph.SparkSXGeneratePidSimlarity;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import net.minidev.json.JSONArray;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

public class SparkScholexplorerCreateRawGraphJob {
    static final String IDJSONPATH = "$.id";
    static final String SOURCEJSONPATH = "$.source";
    static final String TARGETJSONPATH = "$.target";
    static final String RELJSONPATH = "$.relType";

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkScholexplorerCreateRawGraphJob.class.getResourceAsStream("/eu/dnetlib/dhp/sx/graph/argumentparser/merge_entities_scholix_parameters.json")));
        parser.parseArgument(args);
        SparkSession spark = SparkSession.builder().config(new SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")).appName(SparkScholexplorerCreateRawGraphJob.class.getSimpleName()).master(parser.get("master")).getOrCreate();
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        String inputPath = parser.get("sourcePath");
        String targetPath = parser.get("targetPath");
        String entity = parser.get("entity");
        FileSystem fs = FileSystem.get((Configuration)sc.sc().hadoopConfiguration());
        List<Path> subFolder = Arrays.stream(fs.listStatus(new Path(inputPath))).filter(FileStatus::isDirectory).map(FileStatus::getPath).collect(Collectors.toList());
        ArrayList inputRdd = new ArrayList();
        subFolder.forEach(p -> inputRdd.add(sc.textFile(p.toUri().getRawPath())));
        JavaRDD union = sc.emptyRDD();
        for (JavaRDD item2 : inputRdd) {
            union = union.union(item2);
        }
        switch (entity) {
            case "dataset": {
                union.mapToPair((PairFunction & Serializable)f -> {
                    String id = SparkScholexplorerCreateRawGraphJob.getJPathString(IDJSONPATH, f);
                    ObjectMapper mapper = new ObjectMapper();
                    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                    return new Tuple2((Object)id, mapper.readValue(f, DLIDataset.class));
                }).reduceByKey((Function2 & Serializable)(a, b) -> {
                    a.mergeFrom((OafEntity)b);
                    return a;
                }).map((Function & Serializable)item -> {
                    ObjectMapper mapper = new ObjectMapper();
                    return mapper.writeValueAsString(item._2());
                }).saveAsTextFile(targetPath, GzipCodec.class);
                break;
            }
            case "publication": {
                union.mapToPair((PairFunction & Serializable)f -> {
                    String id = SparkScholexplorerCreateRawGraphJob.getJPathString(IDJSONPATH, f);
                    ObjectMapper mapper = new ObjectMapper();
                    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                    return new Tuple2((Object)id, mapper.readValue(f, DLIPublication.class));
                }).reduceByKey((Function2 & Serializable)(a, b) -> {
                    a.mergeFrom((OafEntity)b);
                    return a;
                }).map((Function & Serializable)item -> {
                    ObjectMapper mapper = new ObjectMapper();
                    return mapper.writeValueAsString(item._2());
                }).saveAsTextFile(targetPath, GzipCodec.class);
                break;
            }
            case "unknown": {
                union.mapToPair((PairFunction & Serializable)f -> {
                    String id = SparkScholexplorerCreateRawGraphJob.getJPathString(IDJSONPATH, f);
                    ObjectMapper mapper = new ObjectMapper();
                    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                    return new Tuple2((Object)id, mapper.readValue(f, DLIUnknown.class));
                }).reduceByKey((Function2 & Serializable)(a, b) -> {
                    a.mergeFrom(b);
                    return a;
                }).map((Function & Serializable)item -> {
                    ObjectMapper mapper = new ObjectMapper();
                    return mapper.writeValueAsString(item._2());
                }).saveAsTextFile(targetPath, GzipCodec.class);
                break;
            }
            case "relation": {
                SparkSXGeneratePidSimlarity.generateDataFrame(spark, sc, inputPath.replace("/relation", ""), targetPath.replace("/relation", ""));
                RDD rdd = union.mapToPair((PairFunction & Serializable)f -> {
                    String source = SparkScholexplorerCreateRawGraphJob.getJPathString(SOURCEJSONPATH, f);
                    String target = SparkScholexplorerCreateRawGraphJob.getJPathString(TARGETJSONPATH, f);
                    String reltype = SparkScholexplorerCreateRawGraphJob.getJPathString(RELJSONPATH, f);
                    ObjectMapper mapper = new ObjectMapper();
                    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
                    return new Tuple2((Object)DHPUtils.md5((String)String.format("%s::%s::%s", source.toLowerCase(), reltype.toLowerCase(), target.toLowerCase())), mapper.readValue(f, DLIRelation.class));
                }).reduceByKey((Function2 & Serializable)(a, b) -> {
                    a.mergeFrom((Relation)b);
                    return a;
                }).map(Tuple2::_2).rdd();
                spark.createDataset(rdd, Encoders.bean(DLIRelation.class)).write().mode(SaveMode.Overwrite).save(targetPath);
                Dataset rel_ds = spark.read().load(targetPath).as(Encoders.bean(Relation.class));
                System.out.println("LOADING PATH :" + targetPath.replace("/relation", "") + "/pid_simRel");
                Dataset sim_ds = spark.read().load(targetPath.replace("/relation", "") + "/pid_simRel").as(Encoders.bean(Relation.class));
                Dataset ids = sim_ds.map((MapFunction & Serializable)relation -> {
                    String type = StringUtils.substringBefore((String)relation.getSource(), (String)"|");
                    relation.setTarget(String.format("%s|%s", type, StringUtils.substringAfter((String)relation.getTarget(), (String)"::")));
                    return relation;
                }, Encoders.bean(Relation.class));
                Dataset firstJoin = rel_ds.joinWith(ids, ids.col("target").equalTo((Object)rel_ds.col("source")), "left_outer").map((MapFunction & Serializable)s -> {
                    if (s._2() != null) {
                        ((Relation)s._1()).setSource(((Relation)s._2()).getSource());
                    }
                    return (Relation)s._1();
                }, Encoders.bean(Relation.class));
                Dataset secondJoin = firstJoin.joinWith(ids, ids.col("target").equalTo((Object)firstJoin.col("target")), "left_outer").map((MapFunction & Serializable)s -> {
                    if (s._2() != null) {
                        ((Relation)s._1()).setTarget(((Relation)s._2()).getSource());
                    }
                    return (Relation)s._1();
                }, Encoders.bean(Relation.class));
                secondJoin.write().mode(SaveMode.Overwrite).save(targetPath + "_fixed");
                FileSystem fileSystem = FileSystem.get((Configuration)sc.hadoopConfiguration());
                fileSystem.delete(new Path(targetPath), true);
                fileSystem.rename(new Path(targetPath + "_fixed"), new Path(targetPath));
            }
        }
    }

    public static String getJPathString(String jsonPath, String json) {
        try {
            Object o = JsonPath.read((String)json, (String)jsonPath, (Predicate[])new Predicate[0]);
            if (o instanceof String) {
                return (String)o;
            }
            if (o instanceof JSONArray && ((JSONArray)o).size() > 0) {
                return (String)((JSONArray)o).get(0);
            }
            return "";
        }
        catch (Exception e) {
            return "";
        }
    }
}

