/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.promote;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import eu.dnetlib.dhp.actionmanager.promote.MergeAndGet;
import eu.dnetlib.dhp.actionmanager.promote.PromoteAction;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadFunctions;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.FunctionalInterfaceSupport;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Person;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PromoteActionPayloadForGraphTableJob {
    private static final Logger logger = LoggerFactory.getLogger(PromoteActionPayloadForGraphTableJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)PromoteActionPayloadForGraphTableJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/promote/promote_action_payload_for_graph_table_input_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputGraphTablePath = parser.get("inputGraphTablePath");
        logger.info("inputGraphTablePath: {}", (Object)inputGraphTablePath);
        String graphTableClassName = parser.get("graphTableClassName");
        logger.info("graphTableClassName: {}", (Object)graphTableClassName);
        String inputActionPayloadPath = parser.get("inputActionPayloadPath");
        logger.info("inputActionPayloadPath: {}", (Object)inputActionPayloadPath);
        String actionPayloadClassName = parser.get("actionPayloadClassName");
        logger.info("actionPayloadClassName: {}", (Object)actionPayloadClassName);
        String outputGraphTablePath = parser.get("outputGraphTablePath");
        logger.info("outputGraphTablePath: {}", (Object)outputGraphTablePath);
        MergeAndGet.Strategy mergeAndGetStrategy = MergeAndGet.Strategy.valueOf(parser.get("mergeAndGetStrategy").toUpperCase());
        logger.info("mergeAndGetStrategy: {}", (Object)mergeAndGetStrategy);
        Boolean shouldGroupById = Optional.ofNullable(parser.get("shouldGroupById")).map(Boolean::valueOf).orElse(true);
        logger.info("shouldGroupById: {}", (Object)shouldGroupById);
        PromoteAction.Strategy promoteActionStrategy = Optional.ofNullable(parser.get("promoteActionStrategy")).map(PromoteAction.Strategy::valueOf).orElse(PromoteAction.Strategy.UPSERT);
        logger.info("promoteActionStrategy: {}", (Object)promoteActionStrategy);
        Class<?> rowClazz = Class.forName(graphTableClassName);
        Class<?> actionPayloadClazz = Class.forName(actionPayloadClassName);
        PromoteActionPayloadForGraphTableJob.throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(rowClazz, actionPayloadClazz);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            PromoteActionPayloadForGraphTableJob.removeOutputDir(spark, outputGraphTablePath);
            PromoteActionPayloadForGraphTableJob.promoteActionPayloadForGraphTable(spark, inputGraphTablePath, inputActionPayloadPath, outputGraphTablePath, mergeAndGetStrategy, promoteActionStrategy, rowClazz, actionPayloadClazz, shouldGroupById);
        });
    }

    private static void throwIfGraphTableClassIsNotSubClassOfActionPayloadClass(Class<? extends Oaf> rowClazz, Class<? extends Oaf> actionPayloadClazz) {
        if (!ModelSupport.isSubClass(rowClazz, actionPayloadClazz).booleanValue()) {
            String msg = String.format("graph table class is not a subclass of action payload class: graph=%s, action=%s", rowClazz.getCanonicalName(), actionPayloadClazz.getCanonicalName());
            throw new RuntimeException(msg);
        }
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    private static <G extends Oaf, A extends Oaf> void promoteActionPayloadForGraphTable(SparkSession spark, String inputGraphTablePath, String inputActionPayloadPath, String outputGraphTablePath, MergeAndGet.Strategy mergeAndGetStrategy, PromoteAction.Strategy promoteActionStrategy, Class<G> rowClazz, Class<A> actionPayloadClazz, Boolean shouldGroupById) {
        org.apache.spark.sql.Dataset<G> rowDS = PromoteActionPayloadForGraphTableJob.readGraphTable(spark, inputGraphTablePath, rowClazz);
        org.apache.spark.sql.Dataset<A> actionPayloadDS = PromoteActionPayloadForGraphTableJob.readActionPayload(spark, inputActionPayloadPath, actionPayloadClazz);
        org.apache.spark.sql.Dataset result = PromoteActionPayloadForGraphTableJob.promoteActionPayloadForGraphTable(rowDS, actionPayloadDS, mergeAndGetStrategy, promoteActionStrategy, rowClazz, actionPayloadClazz, shouldGroupById).map((MapFunction & Serializable)value -> value, Encoders.bean(rowClazz));
        PromoteActionPayloadForGraphTableJob.saveGraphTable(result, outputGraphTablePath);
    }

    private static <G extends Oaf> org.apache.spark.sql.Dataset<G> readGraphTable(SparkSession spark, String path, Class<G> rowClazz) {
        logger.info("Reading graph table from path: {}", (Object)path);
        if (HdfsSupport.exists((String)path, (Configuration)spark.sparkContext().hadoopConfiguration())) {
            return spark.read().textFile(path).map((MapFunction & Serializable)value -> (Oaf)OBJECT_MAPPER.readValue(value, rowClazz), Encoders.bean(rowClazz));
        }
        logger.info("Found empty graph table from path: {}", (Object)path);
        return spark.emptyDataset(Encoders.bean(rowClazz));
    }

    private static <A extends Oaf> org.apache.spark.sql.Dataset<A> readActionPayload(SparkSession spark, String path, Class<A> actionPayloadClazz) {
        logger.info("Reading action payload from path: {}", (Object)path);
        return spark.read().parquet(path).map(PromoteActionPayloadForGraphTableJob::extractPayload, Encoders.STRING()).map((MapFunction & Serializable)value -> PromoteActionPayloadForGraphTableJob.decodePayload(actionPayloadClazz, value), Encoders.bean(actionPayloadClazz));
    }

    private static String extractPayload(Row value) {
        try {
            return (String)value.getAs("payload");
        }
        catch (ClassCastException | IllegalArgumentException e) {
            logger.error("cannot extract payload from action: {}", (Object)value);
            throw e;
        }
    }

    private static <A extends Oaf> A decodePayload(Class<A> actionPayloadClazz, String payload) throws IOException {
        try {
            return (A)((Oaf)OBJECT_MAPPER.readValue(payload, actionPayloadClazz));
        }
        catch (UnrecognizedPropertyException e) {
            logger.error("error decoding payload: {}", (Object)payload);
            throw e;
        }
    }

    private static <G extends Oaf, A extends Oaf> org.apache.spark.sql.Dataset<G> promoteActionPayloadForGraphTable(org.apache.spark.sql.Dataset<G> rowDS, org.apache.spark.sql.Dataset<A> actionPayloadDS, MergeAndGet.Strategy mergeAndGetStrategy, PromoteAction.Strategy promoteActionStrategy, Class<G> rowClazz, Class<A> actionPayloadClazz, Boolean shouldGroupById) {
        logger.info("Promoting action payload for graph table: payload={}, table={}", (Object)actionPayloadClazz.getSimpleName(), (Object)rowClazz.getSimpleName());
        FunctionalInterfaceSupport.SerializableSupplier & Serializable rowIdFn = ModelSupport::idFn;
        FunctionalInterfaceSupport.SerializableSupplier & Serializable actionPayloadIdFn = ModelSupport::idFn;
        FunctionalInterfaceSupport.SerializableSupplier mergeRowWithActionPayloadAndGetFn = MergeAndGet.functionFor(mergeAndGetStrategy);
        FunctionalInterfaceSupport.SerializableSupplier mergeRowsAndGetFn = MergeAndGet.functionFor(mergeAndGetStrategy);
        FunctionalInterfaceSupport.SerializableSupplier<G> zeroFn = PromoteActionPayloadForGraphTableJob.zeroFn(rowClazz);
        FunctionalInterfaceSupport.SerializableSupplier & Serializable isNotZeroFn = PromoteActionPayloadForGraphTableJob::isNotZeroFnUsingIdOrSourceAndTarget;
        org.apache.spark.sql.Dataset<G> joinedAndMerged = PromoteActionPayloadFunctions.joinGraphTableWithActionPayloadAndMerge(rowDS, actionPayloadDS, rowIdFn, actionPayloadIdFn, mergeRowWithActionPayloadAndGetFn, promoteActionStrategy, rowClazz, actionPayloadClazz);
        if (Boolean.TRUE.equals(shouldGroupById)) {
            return PromoteActionPayloadFunctions.groupGraphTableByIdAndMerge(joinedAndMerged, rowIdFn, mergeRowsAndGetFn, zeroFn, isNotZeroFn, rowClazz);
        }
        return joinedAndMerged;
    }

    private static <T extends Oaf> FunctionalInterfaceSupport.SerializableSupplier<T> zeroFn(Class<T> clazz) {
        switch (clazz.getCanonicalName()) {
            case "eu.dnetlib.dhp.schema.oaf.Dataset": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Dataset());
            }
            case "eu.dnetlib.dhp.schema.oaf.Datasource": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Datasource());
            }
            case "eu.dnetlib.dhp.schema.oaf.Organization": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Organization());
            }
            case "eu.dnetlib.dhp.schema.oaf.OtherResearchProduct": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new OtherResearchProduct());
            }
            case "eu.dnetlib.dhp.schema.oaf.Project": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Project());
            }
            case "eu.dnetlib.dhp.schema.oaf.Publication": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Publication());
            }
            case "eu.dnetlib.dhp.schema.oaf.Relation": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Relation());
            }
            case "eu.dnetlib.dhp.schema.oaf.Software": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Software());
            }
            case "eu.dnetlib.dhp.schema.oaf.Person": {
                return (FunctionalInterfaceSupport.SerializableSupplier & Serializable)() -> (Oaf)clazz.cast(new Person());
            }
        }
        throw new RuntimeException("unknown class: " + clazz.getCanonicalName());
    }

    private static <T extends Oaf> Function<T, Boolean> isNotZeroFnUsingIdOrSourceAndTarget() {
        return t -> {
            if (ModelSupport.isSubClass((Oaf)t, Relation.class).booleanValue()) {
                Relation rel = (Relation)t;
                return StringUtils.isNotBlank((CharSequence)rel.getSource()) && StringUtils.isNotBlank((CharSequence)rel.getTarget());
            }
            return StringUtils.isNotBlank((CharSequence)((OafEntity)t).getId());
        };
    }

    private static <G extends Oaf> void saveGraphTable(org.apache.spark.sql.Dataset<G> result, String path) {
        logger.info("Saving graph table to path: {}", (Object)path);
        result.toJSON().write().option("compression", "gzip").text(path);
    }
}

