/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttoorganizationfrominstrepo;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.DatasourceOrganization;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.ResultOrganizationSet;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkResultToOrganizationFromIstRepoJob {
    private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromIstRepoJob.class);
    private static final String RESULT_ORGANIZATIONSET_QUERY = "SELECT id resultId, collect_set(organizationId) organizationSet FROM ( SELECT id, organizationId FROM rels JOIN cfhb  ON cf = datasourceId     UNION ALL SELECT id , organizationId     FROM rels JOIN cfhb  ON hb = datasourceId ) tmp GROUP BY id";

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfrominstrepo/input_propagationresulaffiliationfrominstrepo_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String datasourceorganization = parser.get("datasourceOrganizationPath");
        log.info("datasourceOrganizationPath: {}", (Object)datasourceorganization);
        String alreadylinked = parser.get("alreadyLinkedPath");
        log.info("alreadyLinkedPath: {}", (Object)alreadylinked);
        String resultClassName = parser.get("resultTableName");
        log.info("resultTableName: {}", (Object)resultClassName);
        Boolean saveGraph = Optional.ofNullable(parser.get("saveGraph")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("saveGraph: {}", (Object)saveGraph);
        Class<?> resultClazz = Class.forName(resultClassName);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            if (saveGraph.booleanValue()) {
                SparkResultToOrganizationFromIstRepoJob.execPropagation(spark, datasourceorganization, alreadylinked, inputPath, outputPath, resultClazz);
            }
        });
    }

    private static void execPropagation(SparkSession spark, String datasourceorganization, String alreadyLinkedPath, String inputPath, String outputPath, Class<? extends Result> clazz) {
        Dataset<DatasourceOrganization> dsOrg = PropagationConstant.readPath(spark, datasourceorganization, DatasourceOrganization.class);
        Dataset<ResultOrganizationSet> potentialUpdates = SparkResultToOrganizationFromIstRepoJob.getPotentialRelations(spark, inputPath, clazz, dsOrg);
        Dataset<ResultOrganizationSet> alreadyLinked = PropagationConstant.readPath(spark, alreadyLinkedPath, ResultOrganizationSet.class);
        potentialUpdates.joinWith(alreadyLinked, potentialUpdates.col("resultId").equalTo((Object)alreadyLinked.col("resultId")), "left_outer").flatMap(SparkResultToOrganizationFromIstRepoJob.createRelationFn(), Encoders.bean(Relation.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(outputPath);
    }

    private static FlatMapFunction<Tuple2<ResultOrganizationSet, ResultOrganizationSet>, Relation> createRelationFn() {
        return (FlatMapFunction & Serializable)value -> {
            ArrayList newRelations = new ArrayList();
            ResultOrganizationSet potentialUpdate = (ResultOrganizationSet)value._1();
            Optional<Object> alreadyLinked = Optional.ofNullable(value._2());
            ArrayList<String> organizations = potentialUpdate.getOrganizationSet();
            alreadyLinked.ifPresent(resOrg -> resOrg.getOrganizationSet().forEach(organizations::remove));
            String resultId = potentialUpdate.getResultId();
            organizations.forEach(orgId -> {
                newRelations.add(PropagationConstant.getRelation(orgId, resultId, "isAuthorInstitutionOf", "resultOrganization", "affiliation", "propagation", "result:organization:instrepo", "Propagation of affiliation to result collected from datasources of type institutional repository"));
                newRelations.add(PropagationConstant.getRelation(resultId, orgId, "hasAuthorInstitution", "resultOrganization", "affiliation", "propagation", "result:organization:instrepo", "Propagation of affiliation to result collected from datasources of type institutional repository"));
            });
            return newRelations.iterator();
        };
    }

    private static <R extends Result> Dataset<ResultOrganizationSet> getPotentialRelations(SparkSession spark, String inputPath, Class<R> resultClazz, Dataset<DatasourceOrganization> dsOrg) {
        Dataset<R> result = PropagationConstant.readPath(spark, inputPath, resultClazz);
        result.createOrReplaceTempView("result");
        PropagationConstant.createCfHbforResult(spark);
        dsOrg.createOrReplaceTempView("rels");
        return spark.sql(RESULT_ORGANIZATIONSET_QUERY).as(Encoders.bean(ResultOrganizationSet.class));
    }
}

