/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;

import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.resulttoorganizationfrominstrepo.SparkResultToOrganizationFromIstRepoJob;
import eu.dnetlib.dhp.resulttoorganizationfromsemrel.Leaves;
import eu.dnetlib.dhp.resulttoorganizationfromsemrel.PropagationCounter;
import eu.dnetlib.dhp.resulttoorganizationfromsemrel.StepActions;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkResultToOrganizationFromSemRel
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SparkResultToOrganizationFromSemRel.class);
    private static final int MAX_ITERATION = 5;
    public static final String NEW_RELATION_PATH = "/newRelation";

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToOrganizationFromIstRepoJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttoorganizationfromsemrel/input_propagation_parameter.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String relationPath = parser.get("relationPath");
        log.info("relationPath: {}", (Object)relationPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String leavesPath = parser.get("leavesPath");
        log.info("leavesPath: {}", (Object)leavesPath);
        String childParentPath = parser.get("childParentPath");
        log.info("childParentPath: {}", (Object)childParentPath);
        String resultOrganizationPath = parser.get("resultOrgPath");
        log.info("resultOrganizationPath: {}", (Object)resultOrganizationPath);
        String workingPath = parser.get("workingDir");
        log.info("workingPath: {}", (Object)workingPath);
        int iterations = Optional.ofNullable(parser.get("iterations")).map(v -> {
            if (Integer.valueOf(v) < 5) {
                return Integer.valueOf(v);
            }
            return 5;
        }).orElse(5);
        log.info("iterations: {}", (Object)iterations);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> SparkResultToOrganizationFromSemRel.execPropagation(spark, leavesPath, childParentPath, resultOrganizationPath, relationPath, workingPath, outputPath, iterations));
    }

    public static void execPropagation(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String graphPath, String workingPath, String outputPath, int iterations) {
        if (iterations == 1) {
            SparkResultToOrganizationFromSemRel.doPropagateOnce(spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, workingPath, outputPath);
        } else {
            LongAccumulator iterationOne = spark.sparkContext().longAccumulator("ExitAtFirstIteration");
            LongAccumulator iterationTwo = spark.sparkContext().longAccumulator("ExitAtSecondIteration");
            LongAccumulator iterationThree = spark.sparkContext().longAccumulator("ExitAtThirdIteration");
            LongAccumulator iterationFour = spark.sparkContext().longAccumulator("ExitAtFourthIteration");
            LongAccumulator iterationFive = spark.sparkContext().longAccumulator("ExitAtFifthIteration");
            LongAccumulator notReachedFirstParent = spark.sparkContext().longAccumulator("ExitAtNoFirstParentReached");
            PropagationCounter propagationCounter = new PropagationCounter(iterationOne, iterationTwo, iterationThree, iterationFour, iterationFive, notReachedFirstParent);
            SparkResultToOrganizationFromSemRel.doPropagate(spark, leavesPath, childParentPath, resultOrganizationPath, graphPath, workingPath, outputPath, propagationCounter);
        }
    }

    private static void doPropagateOnce(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String graphPath, String workingPath, String outputPath) {
        StepActions.execStep(spark, graphPath, workingPath + NEW_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath);
        SparkResultToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
    }

    private static void doPropagate(SparkSession spark, String leavesPath, String childParentPath, String resultOrganizationPath, String graphPath, String workingPath, String outputPath, PropagationCounter propagationCounter) {
        long leavesCount;
        int iteration = 0;
        do {
            StepActions.execStep(spark, graphPath, workingPath + NEW_RELATION_PATH, leavesPath, childParentPath, resultOrganizationPath);
            StepActions.prepareForNextStep(spark, workingPath + NEW_RELATION_PATH, resultOrganizationPath, leavesPath, childParentPath, workingPath + "/leaves", workingPath + "/resOrg");
            SparkResultToOrganizationFromSemRel.moveOutput(spark, workingPath, leavesPath, resultOrganizationPath);
        } while ((leavesCount = PropagationConstant.readPath(spark, leavesPath, Leaves.class).count()) > 0L && ++iteration < 5);
        if (leavesCount == 0L) {
            switch (String.valueOf(iteration)) {
                case "1": {
                    propagationCounter.getIterationOne().add(1L);
                    break;
                }
                case "2": {
                    propagationCounter.getIterationTwo().add(1L);
                    break;
                }
                case "3": {
                    propagationCounter.getIterationThree().add(1L);
                    break;
                }
                case "4": {
                    propagationCounter.getIterationFour().add(1L);
                    break;
                }
                case "5": {
                    propagationCounter.getIterationFive().add(1L);
                    break;
                }
            }
        } else {
            propagationCounter.getNotReachedFirstParent().add(1L);
        }
        SparkResultToOrganizationFromSemRel.addNewRelations(spark, workingPath + NEW_RELATION_PATH, outputPath);
    }

    private static void moveOutput(SparkSession spark, String workingPath, String leavesPath, String resultOrganizationPath) {
        PropagationConstant.readPath(spark, workingPath + "/leaves", Leaves.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(leavesPath);
        PropagationConstant.readPath(spark, workingPath + "/resOrg", KeyValueSet.class).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(resultOrganizationPath);
    }

    private static void addNewRelations(SparkSession spark, String newRelationPath, String outputPath) {
        Dataset<Relation> relation = PropagationConstant.readPath(spark, newRelationPath, Relation.class);
        relation.groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> (Relation)it.next(), Encoders.bean(Relation.class)).flatMap((FlatMapFunction & Serializable)r -> Arrays.asList(r, PropagationConstant.getRelation(r.getTarget(), r.getSource(), "isAuthorInstitutionOf", "resultOrganization", "affiliation", "propagation", "result:organization:semrel", "Propagation of affiliation to result through sematic relations")).iterator(), Encoders.bean(Relation.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(outputPath);
    }
}

