/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.graph.clean;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.oa.graph.clean.CleaningRuleMap;
import eu.dnetlib.dhp.oa.graph.clean.IdCfHbMapping;
import eu.dnetlib.dhp.oa.graph.clean.OafCleaner;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.GraphCleaningFunctions;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class CleanGraphSparkJob {
    private static final Logger log = LoggerFactory.getLogger(CleanGraphSparkJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private ArgumentApplicationParser parser;

    public CleanGraphSparkJob(ArgumentApplicationParser parser) {
        this.parser = parser;
    }

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)CleanGraphSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/input_clean_graph_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String isLookupUrl = parser.get("isLookupUrl");
        log.info("isLookupUrl: {}", (Object)isLookupUrl);
        ISLookUpService isLookup = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        new CleanGraphSparkJob(parser).run(isSparkSessionManaged, isLookup);
    }

    public void run(Boolean isSparkSessionManaged, ISLookUpService isLookUpService) throws ISLookUpException, ClassNotFoundException {
        String inputPath = this.parser.get("inputPath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = this.parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String graphTableClassName = this.parser.get("graphTableClassName");
        log.info("graphTableClassName: {}", (Object)graphTableClassName);
        String contextId = this.parser.get("contextId");
        log.info("contextId: {}", (Object)contextId);
        String verifyParam = this.parser.get("verifyParam");
        log.info("verifyParam: {}", (Object)verifyParam);
        String datasourcePath = this.parser.get("hostedBy");
        log.info("datasourcePath: {}", (Object)datasourcePath);
        String country = this.parser.get("country");
        log.info("country: {}", (Object)country);
        Object[] verifyCountryParam = Optional.ofNullable(this.parser.get("verifyCountryParam")).map(s -> s.split(";")).orElse(new String[0]);
        log.info("verifyCountryParam: {}", verifyCountryParam);
        String collectedfrom = this.parser.get("collectedfrom");
        log.info("collectedfrom: {}", (Object)collectedfrom);
        String dsMasterDuplicatePath = this.parser.get("masterDuplicatePath");
        log.info("masterDuplicatePath: {}", (Object)dsMasterDuplicatePath);
        String blacklistPath = Optional.ofNullable(this.parser.get("blacklist")).orElse("");
        log.info("blacklist: {}", (Object)blacklistPath);
        Boolean deepClean = Optional.ofNullable(this.parser.get("deepClean")).map(Boolean::valueOf).orElse(Boolean.FALSE);
        log.info("deepClean: {}", (Object)deepClean);
        Class<?> entityClazz = Class.forName(graphTableClassName);
        VocabularyGroup vocs = VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookUpService);
        SparkConf conf = new SparkConf();
        conf.setAppName(CleanGraphSparkJob.class.getSimpleName() + "#" + entityClazz.getSimpleName());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, arg_0 -> CleanGraphSparkJob.lambda$run$1(outputPath, vocs, inputPath, entityClazz, contextId, verifyParam, datasourcePath, country, (String[])verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean, blacklistPath, arg_0));
    }

    private static <T extends Oaf> void cleanGraphTable(SparkSession spark, VocabularyGroup vocs, String inputPath, Class<T> clazz, String outputPath, String contextId, String verifyParam, String datasourcePath, String country, String[] verifyCountryParam, String collectedfrom, String dsMasterDuplicatePath, Boolean deepClean, String blacklistPath) {
        CleaningRuleMap mapping = CleaningRuleMap.create(vocs);
        Dataset cleaned_basic = CleanGraphSparkJob.readFilteredTableFromPath(spark, inputPath, clazz, blacklistPath).map(GraphCleaningFunctions::fixVocabularyNames, Encoders.bean(clazz)).map((MapFunction & Serializable)value -> OafCleaner.apply(value, mapping), Encoders.bean(clazz)).map((MapFunction & Serializable)value -> GraphCleaningFunctions.cleanup((Oaf)value, (VocabularyGroup)vocs), Encoders.bean(clazz)).filter(GraphCleaningFunctions::filter);
        Dataset md = spark.read().textFile(dsMasterDuplicatePath).map(CleanGraphSparkJob.as(MasterDuplicate.class), Encoders.bean(MasterDuplicate.class));
        Dataset resolved = spark.read().textFile(inputPath).map(CleanGraphSparkJob.as(clazz), Encoders.bean(clazz)).flatMap(CleanGraphSparkJob.flattenCfHbFn(), Encoders.bean(IdCfHbMapping.class));
        if (Boolean.FALSE.equals(deepClean)) {
            if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) {
                CleanGraphSparkJob.save(CleanGraphSparkJob.fixCFHB(clazz, cleaned_basic, (Dataset<MasterDuplicate>)md, (Dataset<IdCfHbMapping>)resolved), outputPath);
            } else {
                CleanGraphSparkJob.save(cleaned_basic, outputPath);
            }
        } else if (Boolean.TRUE.equals(ModelSupport.isSubClass(clazz, Result.class))) {
            HashSet hostedBy = Sets.newHashSet((Iterable)spark.read().textFile(datasourcePath).collectAsList());
            Dataset cleaned_deep = CleanGraphSparkJob.fixCFHB(clazz, cleaned_basic, (Dataset<MasterDuplicate>)md, (Dataset<IdCfHbMapping>)resolved).map((MapFunction & Serializable)value -> GraphCleaningFunctions.cleanContext((Oaf)value, (String)contextId, (String)verifyParam), Encoders.bean(clazz)).map((MapFunction & Serializable)value -> GraphCleaningFunctions.cleanCountry((Oaf)value, (String[])verifyCountryParam, (Set)hostedBy, (String)collectedfrom, (String)country), Encoders.bean(clazz));
            CleanGraphSparkJob.save(cleaned_deep, outputPath);
        } else {
            CleanGraphSparkJob.save(cleaned_basic, outputPath);
        }
    }

    private static <T extends Oaf> void save(Dataset<T> dataset, String outputPath) {
        dataset.write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath);
    }

    private static <T extends Oaf> Dataset<T> fixCFHB(Class<T> clazz, Dataset<T> results, Dataset<MasterDuplicate> md, Dataset<IdCfHbMapping> resolved) {
        Dataset resolvedDs = resolved.joinWith(md, resolved.col("cfhb").equalTo((Object)md.col("duplicateId"))).map(CleanGraphSparkJob.asIdCfHbMapping(), Encoders.bean(IdCfHbMapping.class)).filter((FilterFunction & Serializable)m -> Objects.nonNull(m.getMasterId()));
        return results.joinWith(resolvedDs, results.col("id").equalTo((Object)resolvedDs.col("resultId")), "left").groupByKey((MapFunction & Serializable)t -> ((Result)t._1()).getId(), Encoders.STRING()).mapGroups(CleanGraphSparkJob.getMapGroupsFunction(), Encoders.bean(clazz));
    }

    private static <T extends Oaf> Dataset<T> readFilteredTableFromPath(SparkSession spark, String inputEntityPath, Class<T> clazz, String blacklistPath) {
        log.info("Reading Graph table from: {}", (Object)inputEntityPath);
        Dataset res = spark.read().textFile(inputEntityPath).map(CleanGraphSparkJob.as(clazz), Encoders.bean(clazz));
        if (Relation.class.isAssignableFrom(clazz) || blacklistPath.isEmpty()) {
            return res;
        }
        Dataset blacklist = spark.read().load(blacklistPath);
        return res.join(blacklist, (Seq)JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti").as(Encoders.bean(clazz));
    }

    private static <R> MapFunction<String, R> as(Class<R> clazz) {
        return (MapFunction & Serializable)s -> OBJECT_MAPPER.readValue(s, clazz);
    }

    private static <T extends Oaf> FlatMapFunction<T, IdCfHbMapping> flattenCfHbFn() {
        return (FlatMapFunction & Serializable)r -> Stream.concat(Optional.ofNullable(r.getCollectedfrom()).map(cf -> cf.stream().map(KeyValue::getKey)).orElse(Stream.empty()), Stream.concat(Optional.ofNullable(((Result)r).getInstance()).map(instances -> instances.stream().map(i -> Optional.ofNullable(i.getHostedby()).map(KeyValue::getKey).orElse(""))).orElse(Stream.empty()).filter(StringUtils::isNotBlank), Optional.ofNullable(((Result)r).getInstance()).map(instances -> instances.stream().map(i -> Optional.ofNullable(i.getCollectedfrom()).map(KeyValue::getKey).orElse(""))).orElse(Stream.empty()).filter(StringUtils::isNotBlank))).distinct().filter(StringUtils::isNotBlank).map(cfHb -> CleanGraphSparkJob.asIdCfHbMapping(((Result)r).getId(), cfHb)).iterator();
    }

    private static MapFunction<Tuple2<IdCfHbMapping, MasterDuplicate>, IdCfHbMapping> asIdCfHbMapping() {
        return (MapFunction & Serializable)t -> {
            IdCfHbMapping mapping = (IdCfHbMapping)t._1();
            Optional.ofNullable((MasterDuplicate)t._2()).ifPresent(t2 -> {
                mapping.setMasterId(t2.getMasterId());
                mapping.setMasterName(t2.getMasterName());
            });
            return mapping;
        };
    }

    private static IdCfHbMapping asIdCfHbMapping(String resultId, String cfHb) {
        IdCfHbMapping m = new IdCfHbMapping(resultId);
        m.setCfhb(cfHb);
        return m;
    }

    private static <T extends Oaf> MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T> getMapGroupsFunction() {
        return new MapGroupsFunction<String, Tuple2<T, IdCfHbMapping>, T>(){

            public T call(String key, Iterator<Tuple2<T, IdCfHbMapping>> values) {
                Tuple2 first = values.next();
                Oaf res = (Oaf)first._1();
                this.updateResult(res, (IdCfHbMapping)first._2());
                values.forEachRemaining(t -> this.updateResult(res, (IdCfHbMapping)t._2()));
                return res;
            }

            private void updateResult(T res, IdCfHbMapping m) {
                if (Objects.nonNull(m)) {
                    this.filter(res.getCollectedfrom()).forEach(kv -> this.updateKeyValue((KeyValue)kv, m));
                    ((Result)res).getInstance().forEach(i -> {
                        this.updateKeyValue(i.getHostedby(), m);
                        this.updateKeyValue(i.getCollectedfrom(), m);
                    });
                }
            }

            private Stream<KeyValue> filter(List<KeyValue> kvs) {
                return kvs.stream().filter((? super T kv) -> StringUtils.isNotBlank((CharSequence)kv.getKey()) && StringUtils.isNotBlank((CharSequence)kv.getValue()));
            }

            private void updateKeyValue(KeyValue kv, IdCfHbMapping a) {
                if (Objects.nonNull(kv) && Objects.nonNull(kv.getKey()) && kv.getKey().equals(a.getCfhb())) {
                    kv.setKey(a.getMasterId());
                    kv.setValue(a.getMasterName());
                }
            }
        };
    }

    private static /* synthetic */ void lambda$run$1(String outputPath, VocabularyGroup vocs, String inputPath, Class entityClazz, String contextId, String verifyParam, String datasourcePath, String country, String[] verifyCountryParam, String collectedfrom, String dsMasterDuplicatePath, Boolean deepClean, String blacklistPath, SparkSession spark) throws Exception {
        HdfsSupport.remove((String)outputPath, (Configuration)spark.sparkContext().hadoopConfiguration());
        CleanGraphSparkJob.cleanGraphTable(spark, vocs, inputPath, entityClazz, outputPath, contextId, verifyParam, datasourcePath, country, verifyCountryParam, collectedfrom, dsMasterDuplicatePath, deepClean, blacklistPath);
    }
}

