/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.incremental;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CollectNewOafResults {
    private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
    private final ArgumentApplicationParser parser;

    public CollectNewOafResults(ArgumentApplicationParser parser) {
        this.parser = parser;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)CollectNewOafResults.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String wrkdirPath = parser.get("workingDir");
        log.info("workingDir is {}", (Object)wrkdirPath);
        String outputPath = parser.get("outputPath");
        log.info("outputPath is {}", (Object)outputPath);
        String mdStoreManagerURI = parser.get("mdStoreManagerURI");
        log.info("mdStoreManagerURI is {}", (Object)mdStoreManagerURI);
        String mdStoreID = parser.get("mdStoreID");
        if (StringUtils.isBlank((CharSequence)mdStoreID)) {
            throw new IllegalArgumentException("missing or empty argument mdStoreID");
        }
        String hiveDbName = parser.get("hiveDbName");
        log.info("hiveDbName is {}", (Object)hiveDbName);
        MDStoreVersion currentVersion = (MDStoreVersion)DNetRestClient.doGET((String)String.format("%s/mdstore/%s/startReading", mdStoreManagerURI, mdStoreID), MDStoreVersion.class);
        log.info("mdstore data is {}", (Object)currentVersion.toString());
        try {
            SparkConf conf = new SparkConf();
            conf.set("hive.metastore.uris", parser.get("hiveMetastoreUris"));
            SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
                Dataset currentIds = spark.table(hiveDbName + ".result").select("id", new String[0]).union(spark.table(hiveDbName + ".relation").where("relClass = 'merges'").selectExpr(new String[]{"target as id"})).distinct();
                UserDefinedFunction getOafType = functions.udf((UDF1 & Serializable)json -> CopyHdfsOafSparkApplication.getOafType((String)json), (DataType)DataTypes.StringType);
                spark.read().text(currentVersion.getHdfsPath() + "/store").selectExpr(new String[]{"value", "get_json_object(value, '$.id') AS id"}).where("id IS NOT NULL").join(currentIds, DHPUtils.toSeq(Collections.singletonList("id")).toSeq(), "left_anti").withColumn("oaftype", getOafType.apply(new Column[]{new Column("value")})).write().partitionBy(new String[]{"oaftype"}).mode(SaveMode.Overwrite).option("compression", "gzip").parquet(wrkdirPath + "/entities");
                ModelSupport.oafTypes.keySet().forEach(entity -> spark.read().parquet(wrkdirPath + "/entities").filter("oaftype = '" + entity + "'").select("value", new String[0]).write().option("compression", "gzip").mode(SaveMode.Append).text(outputPath + "/" + entity));
                Dataset newIds = spark.read().parquet(wrkdirPath + "/entities").select("id", new String[0]);
                Dataset rels = spark.read().text(currentVersion.getHdfsPath() + "/store").selectExpr(new String[]{"value", "get_json_object(value, '$.source') AS source", "get_json_object(value, '$.target') AS target"}).where("source IS NOT NULL AND target IS NOT NULL");
                rels.join(newIds.selectExpr(new String[]{"id as source"}), DHPUtils.toSeq(Collections.singletonList("source")).toSeq(), "left_semi").union(rels.join(newIds.selectExpr(new String[]{"id as target"}), DHPUtils.toSeq(Collections.singletonList("target")).toSeq(), "left_semi")).distinct().select("value", new String[0]).write().option("compression", "gzip").mode(SaveMode.Append).text(outputPath + "/relation");
            });
        }
        catch (Throwable throwable) {
            DNetRestClient.doGET((String)String.format("%s/version/%s/endReading", mdStoreManagerURI, currentVersion.getId()));
            throw throwable;
        }
        DNetRestClient.doGET((String)String.format("%s/version/%s/endReading", mdStoreManagerURI, currentVersion.getId()));
    }
}

