/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.OpenorgsUtility;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.oa.dedup.model.ParentChildRel;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;

public class SparkPrepareOrgRels
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class);

    public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPrepareOrgRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkPrepareOrgRels(parser, SparkPrepareOrgRels.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws IOException {
        String graphBasePath = this.parser.get("graphBasePath");
        String isLookUpUrl = this.parser.get("isLookUpUrl");
        String actionSetId = this.parser.get("actionSetId");
        String workingPath = this.parser.get("workingPath");
        int numConnections = java.util.Optional.ofNullable(this.parser.get("numConnections")).map(Integer::valueOf).orElse(20);
        String dbUrl = this.parser.get("dbUrl");
        String dedupEventsTable = this.parser.get("dedupEventsTable");
        String parentChildTable = this.parser.get("parentChildTable");
        String dbUser = this.parser.get("dbUser");
        String dbPwd = this.parser.get("dbPwd");
        log.info("graphBasePath:      '{}'", (Object)graphBasePath);
        log.info("isLookUpUrl:        '{}'", (Object)isLookUpUrl);
        log.info("actionSetId:        '{}'", (Object)actionSetId);
        log.info("workingPath:        '{}'", (Object)workingPath);
        log.info("numPartitions:      '{}'", (Object)numConnections);
        log.info("dbUrl:              '{}'", (Object)dbUrl);
        log.info("dbUser:             '{}'", (Object)dbUser);
        log.info("dedupEventsTable:   '{}'", (Object)dedupEventsTable);
        log.info("parentChildTable:   '{}'", (Object)parentChildTable);
        log.info("dbPwd:              '{}'", (Object)"xxx");
        String organization = ModelSupport.getMainType((EntityType)EntityType.organization);
        String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, organization);
        String entityPath = DedupUtility.createEntityPath(graphBasePath, organization);
        String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
        JavaRDD<Tuple2<Tuple2<String, String>, String>> diffRels = OpenorgsUtility.collectRels(this.spark, relationPath, "isDifferentFrom", "organizationOrganization", "dedup", true);
        log.info("Number of DiffRels collected: {}", (Object)diffRels.count());
        JavaRDD<Tuple2<Tuple2<String, String>, String>> parentChildRels = OpenorgsUtility.collectRels(this.spark, relationPath, "IsParentOf", "organizationOrganization", "relationship", false);
        log.info("Number of Parent/Child Rels collected: {}", (Object)parentChildRels.count());
        Dataset entities = this.spark.read().textFile(entityPath).map((MapFunction & Serializable)it -> {
            Organization entity = (Organization)OBJECT_MAPPER.readValue(it, Organization.class);
            return new Tuple2((Object)entity.getId(), (Object)entity);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Organization.class)));
        JavaRDD processedMergeRels = OpenorgsUtility.processMergeRels(this.spark, mergeRelPath, diffRels, parentChildRels).cache();
        Dataset parentChildSuggestions = SparkPrepareOrgRels.createParentChildSuggestions(this.spark, (JavaRDD<Tuple3<String, String, String>>)processedMergeRels, parentChildRels, (Dataset<Tuple2<String, Organization>>)entities).cache();
        Dataset duplicatesSuggestions = SparkPrepareOrgRels.createDuplicatesSuggestions(this.spark, (JavaRDD<Tuple3<String, String, String>>)processedMergeRels, (Dataset<Tuple2<String, Organization>>)entities).cache();
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPwd);
        processedMergeRels.unpersist();
        duplicatesSuggestions.repartition(numConnections).write().mode(SaveMode.Overwrite).jdbc(dbUrl, dedupEventsTable, connectionProperties);
        duplicatesSuggestions.unpersist();
        parentChildSuggestions.repartition(numConnections).write().mode(SaveMode.Overwrite).jdbc(dbUrl, parentChildTable, connectionProperties);
        parentChildSuggestions.unpersist();
    }

    private static Dataset<ParentChildRel> createParentChildSuggestions(SparkSession spark, JavaRDD<Tuple3<String, String, String>> openorgsRels, JavaRDD<Tuple2<Tuple2<String, String>, String>> parentChildRels, Dataset<Tuple2<String, Organization>> entities) {
        JavaPairRDD pcRels = parentChildRels.mapToPair((PairFunction & Serializable)r -> (Tuple2)r._1());
        JavaPairRDD rawReprRels = openorgsRels.mapToPair((PairFunction & Serializable)r -> new Tuple2(r._2(), r._1()));
        pcRels = pcRels.leftOuterJoin(rawReprRels).mapToPair((PairFunction & Serializable)j -> new Tuple2(((Tuple2)j._2())._1(), ((Optional)((Tuple2)j._2())._2()).orElse(j._1())));
        pcRels = pcRels.leftOuterJoin(rawReprRels).mapToPair((PairFunction & Serializable)j -> new Tuple2(((Tuple2)j._2())._1(), ((Optional)((Tuple2)j._2())._2()).orElse(j._1())));
        JavaRDD parentChildRelRDD = pcRels.join(entities.toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2(r._1(), r._2()))).mapToPair((PairFunction & Serializable)r -> new Tuple2(((Tuple2)r._2())._1(), java.util.Optional.ofNullable(((Organization)((Tuple2)r._2())._2()).getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null))).join(entities.toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2(r._1(), r._2()))).mapToPair((PairFunction & Serializable)r -> new Tuple2(((Tuple2)r._2())._1(), java.util.Optional.ofNullable(((Organization)((Tuple2)r._2())._2()).getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null))).filter((Function & Serializable)j -> !((String)j._1()).equals(j._2())).flatMap((FlatMapFunction & Serializable)j -> Arrays.asList(new ParentChildRel((String)j._1(), (String)j._2(), "IsParentOf"), new ParentChildRel((String)j._2(), (String)j._1(), "IsChildOf")).iterator()).distinct();
        return spark.createDataset(parentChildRelRDD.rdd(), Encoders.bean(ParentChildRel.class));
    }

    private static Dataset<OrgSimRel> createDuplicatesSuggestions(SparkSession spark, JavaRDD<Tuple3<String, String, String>> openorgsRels, Dataset<Tuple2<String, Organization>> entities) {
        Dataset relations = spark.createDataset(openorgsRels.rdd(), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        Dataset relations2 = relations.joinWith(entities, relations.col("_2").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> {
            Organization o = (Organization)((Tuple2)r._2())._2();
            return new OrgSimRel((String)((Tuple3)r._1())._1(), java.util.Optional.ofNullable(o.getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null), java.util.Optional.ofNullable(o.getLegalname()).map(Field::getValue).orElse(""), java.util.Optional.ofNullable(o.getLegalshortname()).map(Field::getValue).orElse(""), java.util.Optional.ofNullable(o.getCountry()).map(Qualifier::getClassid).orElse(""), java.util.Optional.ofNullable(o.getWebsiteurl()).map(Field::getValue).orElse(""), java.util.Optional.ofNullable(o.getCollectedfrom()).map(c -> java.util.Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse("")).orElse(""), (String)((Tuple3)r._1())._3(), SparkPrepareOrgRels.structuredPropertyListToString(o.getPid()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEclegalbody()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEclegalperson()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcnonprofit()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcresearchorganization()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEchighereducation()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcinternationalorganizationeurinterests()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcinternationalorganization()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcenterprise()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcsmevalidated()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcnutscode()));
        }, Encoders.bean(OrgSimRel.class)).map((MapFunction & Serializable)o -> new Tuple2((Object)o.getLocal_id(), o), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(OrgSimRel.class)));
        return relations2.joinWith(entities, relations2.col("_1").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> {
            OrgSimRel orgSimRel = (OrgSimRel)((Tuple2)r._1())._2();
            orgSimRel.setLocal_id(java.util.Optional.ofNullable(((Organization)((Tuple2)r._2())._2()).getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null));
            return orgSimRel;
        }, Encoders.bean(OrgSimRel.class));
    }
}

