/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.resulttocommunityfromorganization;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.resulttocommunityfromorganization.ResultCommunityList;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkResultToCommunityFromOrganizationJob {
    private static final Logger log = LoggerFactory.getLogger(SparkResultToCommunityFromOrganizationJob.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkResultToCommunityFromOrganizationJob.class.getResourceAsStream("/eu/dnetlib/dhp/resulttocommunityfromorganization/input_communitytoresult_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String possibleupdatespath = parser.get("preparedInfoPath");
        log.info("preparedInfoPath: {}", (Object)possibleupdatespath);
        String resultClassName = parser.get("resultTableName");
        log.info("resultTableName: {}", (Object)resultClassName);
        Boolean saveGraph = Optional.ofNullable(parser.get("saveGraph")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("saveGraph: {}", (Object)saveGraph);
        Class<?> resultClazz = Class.forName(resultClassName);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PropagationConstant.removeOutputDir(spark, outputPath);
            if (saveGraph.booleanValue()) {
                SparkResultToCommunityFromOrganizationJob.execPropagation(spark, inputPath, outputPath, resultClazz, possibleupdatespath);
            }
        });
    }

    private static <R extends Result> void execPropagation(SparkSession spark, String inputPath, String outputPath, Class<R> resultClazz, String possibleUpdatesPath) {
        Dataset<ResultCommunityList> possibleUpdates = PropagationConstant.readPath(spark, possibleUpdatesPath, ResultCommunityList.class);
        Dataset<R> result = PropagationConstant.readPath(spark, inputPath, resultClazz);
        result.joinWith(possibleUpdates, result.col("id").equalTo((Object)possibleUpdates.col("resultId")), "left_outer").map(SparkResultToCommunityFromOrganizationJob.resultCommunityFn(), Encoders.bean(resultClazz)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static <R extends Result> MapFunction<Tuple2<R, ResultCommunityList>, R> resultCommunityFn() {
        return (MapFunction & Serializable)value -> {
            Result ret = (Result)value._1();
            Optional<Object> rcl = Optional.ofNullable(value._2());
            if (rcl.isPresent()) {
                ArrayList<String> communitySet = ((ResultCommunityList)rcl.get()).getCommunityList();
                List contextList = ret.getContext().stream().map(Context::getId).collect(Collectors.toList());
                Result res = (Result)ret.getClass().newInstance();
                res.setId(ret.getId());
                ArrayList<Context> propagatedContexts = new ArrayList<Context>();
                for (String cId : communitySet) {
                    if (contextList.contains(cId)) continue;
                    Context newContext = new Context();
                    newContext.setId(cId);
                    newContext.setDataInfo(Arrays.asList(PropagationConstant.getDataInfo("propagation", "result:community:organization", " Propagation of result belonging to community through organization", "dnet:provenanceActions")));
                    propagatedContexts.add(newContext);
                }
                res.setContext(propagatedContexts);
                ret.mergeFrom((OafEntity)res);
            }
            return ret;
        };
    }
}

