/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.PrepareRelationsJob;
import eu.dnetlib.dhp.oa.provision.model.EntityRelEntity;
import eu.dnetlib.dhp.oa.provision.model.TypedRow;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.InputStream;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class CreateRelatedEntitiesJob_phase2 {
    private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputRelatedEntitiesPath = parser.get("inputRelatedEntitiesPath");
        log.info("inputRelatedEntitiesPath: {}", (Object)inputRelatedEntitiesPath);
        String inputGraphRootPath = parser.get("inputGraphRootPath");
        log.info("inputGraphRootPath: {}", (Object)inputGraphRootPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        int numPartitions = Integer.parseInt(parser.get("numPartitions"));
        log.info("numPartitions: {}", (Object)numPartitions);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            CreateRelatedEntitiesJob_phase2.removeOutputDir(spark, outputPath);
            CreateRelatedEntitiesJob_phase2.joinAllEntities(spark, inputRelatedEntitiesPath, inputGraphRootPath, outputPath, numPartitions);
        });
    }

    private static void joinAllEntities(SparkSession spark, String inputRelatedEntitiesPath, String inputGraphRootPath, String outputPath, int numPartitions) {
        org.apache.spark.sql.Dataset<Tuple2<String, TypedRow>> entities = CreateRelatedEntitiesJob_phase2.readAllEntities(spark, inputGraphRootPath, numPartitions);
        org.apache.spark.sql.Dataset<Tuple2<String, EntityRelEntity>> relsBySource = CreateRelatedEntitiesJob_phase2.readRelatedEntities(spark, inputRelatedEntitiesPath);
        entities.joinWith(relsBySource, entities.col("_1").equalTo((Object)relsBySource.col("_1")), "left_outer").map((MapFunction & Serializable)value -> {
            EntityRelEntity re = new EntityRelEntity();
            re.setEntity((TypedRow)((Tuple2)value._1())._2());
            Optional<EntityRelEntity> related = Optional.ofNullable(value._2()).map(Tuple2::_2);
            if (related.isPresent()) {
                re.setRelation(related.get().getRelation());
                re.setTarget(related.get().getTarget());
            }
            return re;
        }, Encoders.bean(EntityRelEntity.class)).repartition(numPartitions).filter((FilterFunction & Serializable)value -> value.getEntity() != null && StringUtils.isNotBlank((CharSequence)value.getEntity().getId())).write().mode(SaveMode.Overwrite).parquet(outputPath);
    }

    private static org.apache.spark.sql.Dataset<Tuple2<String, TypedRow>> readAllEntities(SparkSession spark, String inputGraphPath, int numPartitions) {
        org.apache.spark.sql.Dataset<TypedRow> publication = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/publication", Publication.class);
        org.apache.spark.sql.Dataset<TypedRow> dataset = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/dataset", Dataset.class);
        org.apache.spark.sql.Dataset<TypedRow> other = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/otherresearchproduct", OtherResearchProduct.class);
        org.apache.spark.sql.Dataset<TypedRow> software = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/software", Software.class);
        org.apache.spark.sql.Dataset<TypedRow> datasource = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/datasource", Datasource.class);
        org.apache.spark.sql.Dataset<TypedRow> organization = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/organization", Organization.class);
        org.apache.spark.sql.Dataset<TypedRow> project = CreateRelatedEntitiesJob_phase2.readPathEntity(spark, inputGraphPath + "/project", Project.class);
        return publication.union(dataset).union(other).union(software).union(datasource).union(organization).union(project).map((MapFunction & Serializable)value -> new Tuple2((Object)value.getId(), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(TypedRow.class))).repartition(numPartitions);
    }

    private static org.apache.spark.sql.Dataset<Tuple2<String, EntityRelEntity>> readRelatedEntities(SparkSession spark, String inputRelatedEntitiesPath) {
        log.info("Reading related entities from: {}", (Object)inputRelatedEntitiesPath);
        List paths = HdfsSupport.listFiles((String)inputRelatedEntitiesPath, (Configuration)spark.sparkContext().hadoopConfiguration());
        log.info("Found paths: {}", (Object)String.join((CharSequence)",", paths));
        return spark.read().load(CreateRelatedEntitiesJob_phase2.toSeq(paths)).as(Encoders.bean(EntityRelEntity.class)).map((MapFunction & Serializable)value -> new Tuple2((Object)value.getRelation().getSource(), value), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(EntityRelEntity.class)));
    }

    private static <E extends OafEntity> org.apache.spark.sql.Dataset<TypedRow> readPathEntity(SparkSession spark, String inputEntityPath, Class<E> entityClazz) {
        log.info("Reading Graph table from: {}", (Object)inputEntityPath);
        return spark.read().textFile(inputEntityPath).map((MapFunction & Serializable)value -> (OafEntity)OBJECT_MAPPER.readValue(value, entityClazz), Encoders.bean(entityClazz)).filter("dataInfo.invisible == false").map((MapFunction & Serializable)value -> CreateRelatedEntitiesJob_phase2.getTypedRow(StringUtils.substringAfterLast((String)inputEntityPath, (String)"/"), value), Encoders.bean(TypedRow.class));
    }

    private static TypedRow getTypedRow(String type, OafEntity entity) throws JsonProcessingException {
        TypedRow t = new TypedRow();
        t.setType(type);
        t.setDeleted(entity.getDataInfo().getDeletedbyinference());
        t.setId(entity.getId());
        t.setOaf(OBJECT_MAPPER.writeValueAsString((Object)entity));
        return t;
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static Seq<String> toSeq(List<String> list) {
        return ((Iterator)JavaConverters.asScalaIteratorConverter(list.iterator()).asScala()).toSeq();
    }
}

