/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.bulktag;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.CommunityConfigurationFactory;
import eu.dnetlib.dhp.bulktag.community.Pair;
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.bulktag.community.ResultTagger;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkBulkTagJob {
    private static String OPENAIRE_3 = "openaire3.0";
    private static String OPENAIRE_4 = "openaire-pub_4.0";
    private static String OPENAIRE_CRIS = "openaire-cris_1.1";
    private static String OPENAIRE_DATA = "openaire2.0_data";
    private static String EOSC = "10|openaire____::2e06c1122c7df43765fdcf91080824fa";
    private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class);
    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        CommunityConfiguration cc;
        String jsonConfiguration = IOUtils.toString((InputStream)SparkBulkTagJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("sourcePath");
        log.info("inputPath: {}", (Object)inputPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        String baseURL = parser.get("baseURL");
        log.info("baseURL: {}", (Object)baseURL);
        ProtoMap protoMappingParams = (ProtoMap)new Gson().fromJson(parser.get("pathMap"), ProtoMap.class);
        log.info("pathMap: {}", (Object)new Gson().toJson((Object)protoMappingParams));
        SparkConf conf = new SparkConf();
        String taggingConf = Optional.ofNullable(parser.get("taggingConf")).map(String::valueOf).orElse(null);
        if (taggingConf != null) {
            cc = CommunityConfigurationFactory.newInstance(taggingConf);
        } else {
            cc = Utils.getCommunityConfiguration(baseURL);
            log.info(OBJECT_MAPPER.writeValueAsString((Object)cc));
        }
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkBulkTagJob.extendCommunityConfigurationForEOSC(spark, inputPath, cc);
            SparkBulkTagJob.execBulkTag(spark, inputPath, outputPath, protoMappingParams, cc);
        });
    }

    private static void extendCommunityConfigurationForEOSC(SparkSession spark, String inputPath, CommunityConfiguration cc) {
        Dataset datasources = SparkBulkTagJob.readPath(spark, inputPath + "datasource", Datasource.class).filter((FilterFunction & Serializable)ds -> SparkBulkTagJob.isOKDatasource(ds)).map((MapFunction & Serializable)ds -> ds.getId(), Encoders.STRING());
        Map<String, List<Pair<String, SelectionConstraints>>> dsm = cc.getEoscDatasourceMap();
        for (String ds2 : datasources.collectAsList()) {
            if (dsm.containsKey(ds2)) continue;
            ArrayList eoscList = new ArrayList();
            dsm.put(ds2, eoscList);
        }
    }

    private static boolean isOKDatasource(Datasource ds) {
        String compatibility = ds.getOpenairecompatibility().getClassid();
        return (compatibility.equalsIgnoreCase(OPENAIRE_3) || compatibility.equalsIgnoreCase(OPENAIRE_4) || compatibility.equalsIgnoreCase(OPENAIRE_CRIS) || compatibility.equalsIgnoreCase(OPENAIRE_DATA)) && ds.getCollectedfrom().stream().anyMatch(cf -> cf.getKey().equals(EOSC));
    }

    private static <R extends Result> void execBulkTag(SparkSession spark, String inputPath, String outputPath, ProtoMap protoMappingParams, CommunityConfiguration communityConfiguration) {
        ModelSupport.entityTypes.keySet().parallelStream().filter(ModelSupport::isResult).forEach(e -> {
            PropagationConstant.removeOutputDir(spark, outputPath + e.name());
            ResultTagger resultTagger = new ResultTagger();
            Class resultClazz = (Class)ModelSupport.entityTypes.get(e);
            SparkBulkTagJob.readPath(spark, inputPath + e.name(), resultClazz).map(SparkBulkTagJob.patchResult(), Encoders.bean((Class)resultClazz)).filter(Objects::nonNull).map((MapFunction & Serializable)value -> resultTagger.enrichContextCriteria(value, communityConfiguration, protoMappingParams), Encoders.bean((Class)resultClazz)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + e.name());
            SparkBulkTagJob.readPath(spark, outputPath + e.name(), resultClazz).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(inputPath + e.name());
        });
    }

    public static <R> Dataset<R> readPath(SparkSession spark, String inputPath, Class<R> clazz) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
    }

    private static <R extends Result> MapFunction<R, R> patchResult() {
        return (MapFunction & Serializable)r -> {
            if (Objects.isNull(r.getDataInfo())) {
                r.setDataInfo(OafMapperUtils.dataInfo((Boolean)false, (String)"", (Boolean)false, (Boolean)false, (Qualifier)OafMapperUtils.unknown((String)"", (String)""), (String)""));
            } else if (r.getDataInfo().getDeletedbyinference() == null) {
                r.getDataInfo().setDeletedbyinference(Boolean.valueOf(false));
            }
            if (Objects.isNull(r.getContext())) {
                r.setContext(new ArrayList());
            }
            return r;
        };
    }
}

