/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttoorganizationfromsemrel;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.resulttoorganizationfromsemrel.Leaves;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import scala.Tuple2;

public class StepActions
implements Serializable {
    public static void execStep(SparkSession spark, String graphPath, String newRelationPath, String leavesPath, String chldParentOrgPath, String resultOrgPath) {
        Dataset<Relation> relationGraph = PropagationConstant.readPath(spark, graphPath, Relation.class);
        StepActions.getNewRels(newRelationPath, relationGraph, StepActions.getPropagationRelation(spark, leavesPath, chldParentOrgPath, resultOrgPath));
    }

    public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, String leavesPath, String chldParentOrgPath, String leavesOutputPath, String orgOutputPath) {
        StepActions.changeLeavesSet(spark, leavesPath, chldParentOrgPath, leavesOutputPath);
        StepActions.updateResultOrganization(spark, resultOrgPath, PropagationConstant.readPath(spark, selectedRelsPath, Relation.class), orgOutputPath);
    }

    private static void updateResultOrganization(SparkSession spark, String resultOrgPath, Dataset<Relation> selectedRels, String outputPath) {
        Dataset<KeyValueSet> resultOrg = PropagationConstant.readPath(spark, resultOrgPath, KeyValueSet.class);
        resultOrg.joinWith(selectedRels, resultOrg.col("key").equalTo((Object)selectedRels.col("source")), "left").groupByKey((MapFunction & Serializable)mf -> ((KeyValueSet)mf._1()).getKey(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(key, it) -> {
            Tuple2 first = (Tuple2)it.next();
            if (!Optional.ofNullable(first._2()).isPresent()) {
                return (KeyValueSet)first._1();
            }
            KeyValueSet ret = new KeyValueSet();
            ret.setKey(((KeyValueSet)first._1()).getKey());
            HashSet<String> hs = new HashSet<String>();
            hs.addAll(((KeyValueSet)first._1()).getValueSet());
            hs.add(((Relation)first._2()).getTarget());
            it.forEachRemaining(rel -> hs.add(((Relation)rel._2()).getTarget()));
            ArrayList<String> orgs = new ArrayList<String>();
            orgs.addAll(hs);
            ret.setValueSet(orgs);
            return ret;
        }, Encoders.bean(KeyValueSet.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static void changeLeavesSet(SparkSession spark, String leavesPath, String chldParentOrgPath, String leavesOutputPath) {
        Dataset<KeyValueSet> childParent = PropagationConstant.readPath(spark, chldParentOrgPath, KeyValueSet.class);
        Dataset<Leaves> leaves = PropagationConstant.readPath(spark, leavesPath, Leaves.class);
        childParent.createOrReplaceTempView("childParent");
        leaves.createOrReplaceTempView("leaves");
        spark.sql("SELECT distinct parent as value FROM leaves JOIN (SELECT key, parent       FROM childParent       LATERAL VIEW explode(valueSet) kv as parent) tmp ON value = key ").as(Encoders.bean(Leaves.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(leavesOutputPath);
    }

    @NotNull
    private static void getNewRels(String newRelationPath, Dataset<Relation> relationDataset, Dataset<Relation> newRels) {
        relationDataset.union(newRels).groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            ArrayList relationList = new ArrayList();
            relationList.add(it.next());
            it.forEachRemaining(rel -> relationList.add(rel));
            if (relationList.stream().filter(rel -> !rel.getDataInfo().getProvenanceaction().getClassid().equals("result:organization:semrel")).count() > 0L) {
                return null;
            }
            return new ObjectMapper().writeValueAsString(relationList.get(0));
        }, Encoders.STRING()).filter(Objects::nonNull).map((MapFunction & Serializable)r -> (Relation)new ObjectMapper().readValue(r, Relation.class), Encoders.bean(Relation.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(newRelationPath);
    }

    private static Dataset<Relation> getPropagationRelation(SparkSession spark, String leavesPath, String chldParentOrgPath, String resultOrgPath) {
        Dataset<KeyValueSet> childParent = PropagationConstant.readPath(spark, chldParentOrgPath, KeyValueSet.class);
        Dataset<KeyValueSet> resultOrg = PropagationConstant.readPath(spark, resultOrgPath, KeyValueSet.class);
        Dataset<Leaves> leaves = PropagationConstant.readPath(spark, leavesPath, Leaves.class);
        childParent.createOrReplaceTempView("childParent");
        resultOrg.createOrReplaceTempView("resultOrg");
        leaves.createOrReplaceTempView("leaves");
        Dataset resultParent = spark.sql("SELECT  resId as key, collect_set(parent) valueSet FROM (SELECT key as child, parent       FROM childParent        LATERAL VIEW explode(valueSet) ks as parent) as cp JOIN leaves ON leaves.value = cp.child JOIN (SELECT key as resId, org FROM resultOrg LATERAL VIEW explode (valueSet) ks as org ) as ro ON  leaves.value = ro.org GROUP BY resId").as(Encoders.bean(KeyValueSet.class));
        return resultParent.flatMap((FlatMapFunction & Serializable)v -> v.getValueSet().stream().map(orgId -> PropagationConstant.getRelation(v.getKey(), orgId, "hasAuthorInstitution", "resultOrganization", "affiliation", "propagation", "result:organization:semrel", "Propagation of affiliation to result through sematic relations")).collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class));
    }
}

