/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.countrypropagation;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.countrypropagation.CountrySbs;
import eu.dnetlib.dhp.countrypropagation.DatasourceCountry;
import eu.dnetlib.dhp.countrypropagation.EntityEntityRel;
import eu.dnetlib.dhp.countrypropagation.ResultCountrySet;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class PrepareResultCountrySet {
    private static final Logger log = LoggerFactory.getLogger(PrepareResultCountrySet.class);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareResultCountrySet.class.getResourceAsStream("/eu/dnetlib/dhp/countrypropagation/input_prepareresultcountry_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String workingPath = parser.get("workingPath");
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String datasourcecountrypath = parser.get("preparedInfoPath");
        log.info("preparedInfoPath: {}", (Object)datasourcecountrypath);
        String resultClassName = parser.get("resultTableName");
        log.info("resultTableName: {}", (Object)resultClassName);
        Class<?> resultClazz = Class.forName(resultClassName);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PropagationConstant.removeOutputDir(spark, outputPath);
            PrepareResultCountrySet.getPotentialResultToUpdate(spark, inputPath, outputPath, datasourcecountrypath, workingPath, resultClazz);
        });
    }

    private static <R extends Result> void getPotentialResultToUpdate(SparkSession spark, String inputPath, String outputPath, String datasourcecountrypath, String workingPath, Class<R> resultClazz) {
        Dataset result = PropagationConstant.readPath(spark, inputPath, resultClazz).filter((FilterFunction & Serializable)r -> r.getDataInfo().getDeletedbyinference() == false && r.getDataInfo().getInvisible() == false);
        result.flatMap((FlatMapFunction & Serializable)r -> {
            Set cfhb = r.getCollectedfrom().stream().map(cf -> cf.getKey()).collect(Collectors.toSet());
            cfhb.addAll(r.getInstance().stream().map(i -> i.getHostedby().getKey()).collect(Collectors.toSet()));
            return cfhb.stream().map(value -> EntityEntityRel.newInstance(r.getId(), value)).collect(Collectors.toList()).iterator();
        }, Encoders.bean(EntityEntityRel.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingPath + "/resultCfHb");
        Dataset<DatasourceCountry> datasource_country = PropagationConstant.readPath(spark, datasourcecountrypath, DatasourceCountry.class);
        Dataset<EntityEntityRel> cfhb = PropagationConstant.readPath(spark, workingPath + "/resultCfHb", EntityEntityRel.class);
        datasource_country.joinWith(cfhb, cfhb.col("entity2Id").equalTo((Object)datasource_country.col("datasourceId"))).groupByKey((MapFunction & Serializable)t2 -> ((EntityEntityRel)t2._2()).getEntity1Id(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            ResultCountrySet rcs = new ResultCountrySet();
            rcs.setResultId((String)k);
            HashSet<CountrySbs> set = new HashSet<CountrySbs>();
            HashSet<String> countryCodes = new HashSet<String>();
            DatasourceCountry first = (DatasourceCountry)((Tuple2)it.next())._1();
            countryCodes.add(first.getCountry().getClassid());
            set.add(first.getCountry());
            it.forEachRemaining(t2 -> {
                if (!countryCodes.contains(((DatasourceCountry)t2._1()).getCountry().getClassid())) {
                    set.add(((DatasourceCountry)t2._1()).getCountry());
                }
            });
            rcs.setCountrySet(new ArrayList<CountrySbs>(set));
            return rcs;
        }, Encoders.bean(ResultCountrySet.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }
}

