/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.createunresolvedentities;

import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.createunresolvedentities.model.SDGDataModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Subject;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.PidCleaner;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrepareSDGSparkJob
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(PrepareSDGSparkJob.class);
    private static final String RESULT_ID_PREFIX = (String)ModelSupport.entityIdPrefix.get(Result.class.getSimpleName().toLowerCase()) + "|";
    private static final String DOI_PREFIX = "doi_________::";

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PrepareSDGSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/createunresolvedentities/prepare_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Constants.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath");
        log.info("sourcePath: {}", (Object)sourcePath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath: {}", (Object)outputPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> PrepareSDGSparkJob.processSDG(spark, sourcePath, outputPath));
    }

    private static void processSDG(SparkSession spark, String sourcePath, String outputPath) {
        Constants.readJsonFromPath(spark, sourcePath, SDGDataModel.class).groupByKey(PrepareSDGSparkJob::createIdentifier, Encoders.STRING()).mapGroups(PrepareSDGSparkJob::getResult, Encoders.bean(Result.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(outputPath + "/sdg");
    }

    private static String createIdentifier(SDGDataModel v) {
        if (StringUtils.isNotBlank((CharSequence)v.getDoi())) {
            String doi = PidCleaner.normalizePidValue((String)PidType.doi.toString(), (String)v.getDoi());
            return RESULT_ID_PREFIX + DOI_PREFIX + IdentifierFactory.md5((String)doi);
        }
        if (StringUtils.isNotBlank((CharSequence)v.getOaid())) {
            String oaid = v.getOaid();
            return StringUtils.startsWith((CharSequence)oaid, (CharSequence)RESULT_ID_PREFIX) ? oaid : RESULT_ID_PREFIX + oaid;
        }
        throw new RuntimeException("No identifier found for SDGDataModel: " + String.valueOf(v));
    }

    @NotNull
    private static Result getResult(String id, Iterator<SDGDataModel> it) {
        Result r = new Result();
        r.setId(id);
        SDGDataModel first = it.next();
        ArrayList<Subject> sbjs = new ArrayList<Subject>();
        sbjs.add(Constants.getSubject(first.getSdg(), "SDG", "Sustainable Development Goals", "subject:sdg"));
        it.forEachRemaining(s -> sbjs.add(Constants.getSubject(s.getSdg(), "SDG", "Sustainable Development Goals", "subject:sdg")));
        r.setSubject(sbjs);
        return r;
    }
}

