/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.bipfinder;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.bipmodel.BipScore;
import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipProjectModel;
import eu.dnetlib.dhp.actionmanager.bipmodel.score.deserializers.BipResultModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkAtomicActionScoreJob
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionScoreJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static <I extends Result> void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkAtomicActionScoreJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/bipfinder/input_actionset_parameter.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String resultsInputPath = parser.get("resultsInputPath");
        log.info("resultsInputPath: {}", (Object)resultsInputPath);
        String projectsInputPath = parser.get("projectsInputPath");
        log.info("projectsInputPath: {}", (Object)projectsInputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkAtomicActionScoreJob.removeOutputDir(spark, outputPath);
            JavaPairRDD<Text, Text> resultsRDD = SparkAtomicActionScoreJob.prepareResults(spark, resultsInputPath, outputPath);
            JavaPairRDD<Text, Text> projectsRDD = SparkAtomicActionScoreJob.prepareProjects(spark, projectsInputPath, outputPath);
            resultsRDD.union(projectsRDD).saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class, BZip2Codec.class);
        });
    }

    private static <I extends Project> JavaPairRDD<Text, Text> prepareProjects(SparkSession spark, String inputPath, String outputPath) {
        Dataset<BipProjectModel> projectScores = Constants.readPath(spark, inputPath, BipProjectModel.class);
        return projectScores.map((MapFunction & Serializable)bipProjectScores -> {
            Project project = new Project();
            project.setId(bipProjectScores.getProjectId());
            project.setMeasures(bipProjectScores.toMeasures());
            return project;
        }, Encoders.bean(Project.class)).toJavaRDD().map((Function & Serializable)p -> new AtomicAction(Project.class, (Oaf)p)).mapToPair((PairFunction & Serializable)aa -> new Tuple2((Object)new Text(aa.getClazz().getCanonicalName()), (Object)new Text(OBJECT_MAPPER.writeValueAsString(aa))));
    }

    private static <I extends Result> JavaPairRDD<Text, Text> prepareResults(SparkSession spark, String bipScorePath, String outputPath) {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        JavaRDD bipDeserializeJavaRDD = sc.textFile(bipScorePath).map((Function & Serializable)item -> (BipResultModel)OBJECT_MAPPER.readValue(item, BipResultModel.class));
        Dataset bipScores = spark.createDataset(bipDeserializeJavaRDD.flatMap((FlatMapFunction & Serializable)entry -> entry.keySet().stream().map(key -> {
            BipScore bs = new BipScore();
            bs.setId((String)key);
            bs.setScoreList(entry.get((String)key));
            return bs;
        }).collect(Collectors.toList()).iterator()).rdd(), Encoders.bean(BipScore.class));
        return bipScores.map((MapFunction & Serializable)bs -> {
            Result ret = new Result();
            ret.setId(bs.getId());
            ret.setMeasures(SparkAtomicActionScoreJob.getMeasure(bs));
            return ret;
        }, Encoders.bean(Result.class)).toJavaRDD().map((Function & Serializable)p -> new AtomicAction(Result.class, (Oaf)p)).mapToPair((PairFunction & Serializable)aa -> new Tuple2((Object)new Text(aa.getClazz().getCanonicalName()), (Object)new Text(OBJECT_MAPPER.writeValueAsString(aa))));
    }

    private static List<Measure> getMeasure(BipScore value) {
        return value.getScoreList().stream().map(score -> {
            Measure m = new Measure();
            m.setId(score.getId());
            m.setUnit(score.getUnit().stream().map(unit -> {
                KeyValue kv = new KeyValue();
                kv.setValue(unit.getValue());
                kv.setKey(unit.getKey());
                kv.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, (String)"update", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"measure:bip", (String)"Inferred by OpenAIRE", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)""));
                return kv;
            }).collect(Collectors.toList()));
            return m;
        }).collect(Collectors.toList());
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }
}

