/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.entitytoorganizationfromsemrel;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.KeyValueSet;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.entitytoorganizationfromsemrel.Leaves;
import eu.dnetlib.dhp.schema.oaf.Relation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import scala.Tuple2;

public class StepActions
implements Serializable {
    public static void execStep(SparkSession spark, String graphPath, String newRelationPath, String leavesPath, String chldParentOrgPath, String entityOrgPath, String rel_class) {
        Dataset<Relation> relationGraph = PropagationConstant.readPath(spark, graphPath, Relation.class);
        StepActions.getNewRels(newRelationPath, relationGraph, StepActions.getPropagationRelation(spark, leavesPath, chldParentOrgPath, entityOrgPath, rel_class));
    }

    public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, String leavesPath, String chldParentOrgPath, String leavesOutputPath, String orgOutputPath) {
        StepActions.changeLeavesSet(spark, leavesPath, chldParentOrgPath, leavesOutputPath);
        StepActions.updateEntityOrganization(spark, resultOrgPath, PropagationConstant.readPath(spark, selectedRelsPath, Relation.class), orgOutputPath);
    }

    public static void prepareForNextStep(SparkSession spark, String selectedRelsPath, String resultOrgPath, String projectOrgPath, String leavesPath, String chldParentOrgPath, String leavesOutputPath, String orgOutputPath, String outputProjectPath) {
        StepActions.changeLeavesSet(spark, leavesPath, chldParentOrgPath, leavesOutputPath);
        StepActions.updateEntityOrganization(spark, resultOrgPath, PropagationConstant.readPath(spark, selectedRelsPath + "/newResultRelation", Relation.class), orgOutputPath);
        StepActions.updateEntityOrganization(spark, projectOrgPath, PropagationConstant.readPath(spark, selectedRelsPath + "/newProjectRelation", Relation.class), outputProjectPath);
    }

    private static void updateEntityOrganization(SparkSession spark, String entityOrgPath, Dataset<Relation> selectedRels, String outputPath) {
        Dataset<KeyValueSet> entityOrg = PropagationConstant.readPath(spark, entityOrgPath, KeyValueSet.class);
        entityOrg.joinWith(selectedRels, entityOrg.col("key").equalTo((Object)selectedRels.col("source")), "left").groupByKey((MapFunction & Serializable)mf -> ((KeyValueSet)mf._1()).getKey(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(key, it) -> {
            Tuple2 first = (Tuple2)it.next();
            if (!Optional.ofNullable(first._2()).isPresent()) {
                return (KeyValueSet)first._1();
            }
            KeyValueSet ret = new KeyValueSet();
            ret.setKey(((KeyValueSet)first._1()).getKey());
            HashSet<String> hs = new HashSet<String>();
            hs.addAll(((KeyValueSet)first._1()).getValueSet());
            hs.add(((Relation)first._2()).getTarget());
            it.forEachRemaining(rel -> hs.add(((Relation)rel._2()).getTarget()));
            ArrayList<String> orgs = new ArrayList<String>();
            orgs.addAll(hs);
            ret.setValueSet(orgs);
            return ret;
        }, Encoders.bean(KeyValueSet.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static void changeLeavesSet(SparkSession spark, String leavesPath, String chldParentOrgPath, String leavesOutputPath) {
        Dataset<KeyValueSet> childParent = PropagationConstant.readPath(spark, chldParentOrgPath, KeyValueSet.class);
        Dataset<Leaves> leaves = PropagationConstant.readPath(spark, leavesPath, Leaves.class);
        childParent.createOrReplaceTempView("childParent");
        leaves.createOrReplaceTempView("leaves");
        spark.sql("SELECT distinct parent as value FROM leaves JOIN (SELECT key, parent       FROM childParent       LATERAL VIEW explode(valueSet) kv as parent) tmp ON value = key ").as(Encoders.bean(Leaves.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(leavesOutputPath);
    }

    @NotNull
    private static void getNewRels(String newRelationPath, Dataset<Relation> relationDataset, Dataset<Relation> newRels) {
        relationDataset.union(newRels).groupByKey((MapFunction & Serializable)r -> r.getSource() + r.getTarget(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            ArrayList relationList = new ArrayList();
            relationList.add(it.next());
            it.forEachRemaining(rel -> relationList.add(rel));
            if (relationList.stream().filter(rel -> rel.getDataInfo().getProvenanceaction() != null && !rel.getDataInfo().getProvenanceaction().getClassid().equals("result:organization:semrel") && !rel.getDataInfo().getProvenanceaction().getClassid().equals("project:organization:semrel")).count() > 0L) {
                return null;
            }
            return new ObjectMapper().writeValueAsString(relationList.get(0));
        }, Encoders.STRING()).filter(Objects::nonNull).map((MapFunction & Serializable)r -> (Relation)new ObjectMapper().readValue(r, Relation.class), Encoders.bean(Relation.class)).write().mode(SaveMode.Append).option("compression", "gzip").json(newRelationPath);
    }

    private static Dataset<Relation> getPropagationRelation(SparkSession spark, String leavesPath, String chldParentOrgPath, String entityOrgPath, String semantics) {
        Dataset<KeyValueSet> childParent = PropagationConstant.readPath(spark, chldParentOrgPath, KeyValueSet.class);
        Dataset<KeyValueSet> entityOrg = PropagationConstant.readPath(spark, entityOrgPath, KeyValueSet.class);
        Dataset<Leaves> leaves = PropagationConstant.readPath(spark, leavesPath, Leaves.class);
        childParent.createOrReplaceTempView("childParent");
        entityOrg.createOrReplaceTempView("entityOrg");
        leaves.createOrReplaceTempView("leaves");
        Dataset resultParent = spark.sql("SELECT  entityId as key, collect_set(parent) valueSet FROM (SELECT key as child, parent       FROM childParent        LATERAL VIEW explode(valueSet) ks as parent) as cp JOIN leaves ON leaves.value = cp.child JOIN (SELECT key as entityId, org FROM entityOrg LATERAL VIEW explode (valueSet) ks as org ) as ro ON  leaves.value = ro.org GROUP BY entityId").as(Encoders.bean(KeyValueSet.class));
        return resultParent.flatMap((FlatMapFunction & Serializable)v -> v.getValueSet().stream().map(orgId -> PropagationConstant.getRelation(v.getKey(), orgId, semantics)).collect(Collectors.toList()).iterator(), Encoders.bean(Relation.class));
    }
}

