/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttocommunityfromorganization;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkResultToCommunityFromOrganizationJob {
    private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToCommunityFromOrganizationJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/resulttocommunityfromorganization/input_communitytoresult_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String possibleupdatespath = parser.get("preparedInfoPath");
        log.info("preparedInfoPath: {}", (Object)possibleupdatespath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> SparkResultToCommunityFromOrganizationJob.execPropagation(spark, inputPath, outputPath, possibleupdatespath));
    }

    private static <R extends Result> void execPropagation(SparkSession spark, String inputPath, String outputPath, String possibleUpdatesPath) {
        Dataset<ResultCommunityList> possibleUpdates = PropagationConstant.readPath(spark, possibleUpdatesPath, ResultCommunityList.class);
        ModelSupport.entityTypes.keySet().parallelStream().filter(e -> ModelSupport.isResult((EntityType)e)).forEach(e -> {
            Class resultClazz = (Class)ModelSupport.entityTypes.get(e);
            PropagationConstant.removeOutputDir(spark, outputPath + e.name());
            Dataset result = PropagationConstant.readPath(spark, inputPath + e.name(), resultClazz);
            log.info("executing left join");
            result.joinWith(possibleUpdates, result.col("id").equalTo((Object)possibleUpdates.col("resultId")), "left_outer").map(SparkResultToCommunityFromOrganizationJob.resultCommunityFn(), Encoders.bean((Class)resultClazz)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + e.name());
        });
    }

    private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() {
        return (MapFunction & Serializable)value -> {
            Result ret = (Result)value._1();
            Optional<Object> rcl = Optional.ofNullable(value._2());
            if (rcl.isPresent()) {
                ArrayList<String> communitySet = ((ResultCommunityList)rcl.get()).getCommunityList();
                List contextList = ret.getContext().stream().map(Context::getId).collect(Collectors.toList());
                for (String cId : communitySet) {
                    if (contextList.contains(cId)) continue;
                    Context newContext = new Context();
                    newContext.setId(cId);
                    newContext.setDataInfo(Arrays.asList(PropagationConstant.getDataInfo("propagation", "result:community:organization", " Propagation of result belonging to community through organization", "dnet:provenanceActions")));
                    ret.getContext().add(newContext);
                }
            }
            return ret;
        };
    }
}

