/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.InputStream;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkPropagateRelation
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkPropagateRelation.class);
    private static Encoder<Relation> REL_BEAN_ENC = Encoders.bean(Relation.class);
    private static Encoder<Relation> REL_KRYO_ENC = Encoders.kryo(Relation.class);

    public SparkPropagateRelation(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPropagateRelation.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/propagateRelation_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkPropagateRelation(parser, SparkPropagateRelation.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) {
        String graphBasePath = this.parser.get("graphBasePath");
        String workingPath = this.parser.get("workingPath");
        String graphOutputPath = this.parser.get("graphOutputPath");
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("workingPath: '{}'", (Object)workingPath);
        log.info("graphOutputPath: '{}'", (Object)graphOutputPath);
        Dataset mergeRels = this.spark.read().load(DedupUtility.createMergeRelPath(workingPath, "*", "*")).as(REL_BEAN_ENC);
        Dataset idsToMerge = mergeRels.where(functions.col((String)"relClass").equalTo((Object)"merges")).select(new Column[]{functions.col((String)"source").as("dedupID"), functions.col((String)"target").as("mergedObjectID")}).distinct();
        Dataset allRels = this.spark.read().schema(REL_BEAN_ENC.schema()).json(graphBasePath + "/relation");
        Dataset dedupedRels = allRels.joinWith(idsToMerge, allRels.col("source").equalTo((Object)idsToMerge.col("mergedObjectID")), "left_outer").joinWith(idsToMerge, functions.col((String)"_1.target").equalTo((Object)idsToMerge.col("mergedObjectID")), "left_outer").select("_1._1", new String[]{"_1._2.dedupID", "_2.dedupID"}).as(Encoders.tuple(REL_BEAN_ENC, (Encoder)Encoders.STRING(), (Encoder)Encoders.STRING())).map((MapFunction & Serializable)t -> {
            Relation rel = (Relation)t._1();
            String newSource = (String)t._2();
            String newTarget = (String)t._3();
            if (rel.getDataInfo() == null) {
                rel.setDataInfo(new DataInfo());
            }
            if (newSource != null || newTarget != null) {
                rel.getDataInfo().setDeletedbyinference(Boolean.valueOf(false));
                if (newSource != null) {
                    rel.setSource(newSource);
                }
                if (newTarget != null) {
                    rel.setTarget(newTarget);
                }
            }
            return rel;
        }, REL_BEAN_ENC);
        Dataset<Row> ids = SparkPropagateRelation.validIds(this.spark, graphBasePath);
        Dataset cleanedRels = dedupedRels.join(ids, functions.col((String)"source").equalTo((Object)ids.col("id")), "leftsemi").join(ids, functions.col((String)"target").equalTo((Object)ids.col("id")), "leftsemi").as(REL_BEAN_ENC).map((MapFunction & Serializable)r -> {
            r.getDataInfo().setInvisible(Boolean.valueOf(false));
            return r;
        }, REL_KRYO_ENC);
        Dataset distinctRels = cleanedRels.groupByKey((MapFunction & Serializable)r -> String.join((CharSequence)" ", r.getSource(), r.getTarget(), r.getRelType(), r.getSubRelType(), r.getRelClass()), Encoders.STRING()).reduceGroups((ReduceFunction & Serializable)(b, a) -> {
            b.mergeFrom(a);
            return b;
        }).map(Tuple2::_2, REL_BEAN_ENC);
        String outputRelationPath = graphOutputPath + "/relation";
        SparkPropagateRelation.removeOutputDir(this.spark, outputRelationPath);
        SparkPropagateRelation.save(distinctRels.union(mergeRels).filter("source != target AND dataInfo.deletedbyinference != true AND dataInfo.invisible != true"), outputRelationPath, SaveMode.Overwrite);
    }

    static Dataset<Row> validIds(SparkSession spark, String graphBasePath) {
        StructType idsSchema = StructType.fromDDL((String)"`id` STRING, `dataInfo` STRUCT<`deletedbyinference`:BOOLEAN,`invisible`:BOOLEAN>");
        Dataset allIds = spark.emptyDataset((Encoder)RowEncoder.apply((StructType)idsSchema));
        for (EntityType entityType : ModelSupport.entityTypes.keySet()) {
            String entityPath = graphBasePath + '/' + entityType.name();
            if (!HdfsSupport.exists((String)entityPath, (Configuration)spark.sparkContext().hadoopConfiguration())) continue;
            allIds = allIds.union(spark.read().schema(idsSchema).json(entityPath));
        }
        return allIds.filter("dataInfo.deletedbyinference != true AND dataInfo.invisible != true").select("id", new String[0]).distinct();
    }
}

