/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.partition;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.actionmanager.partition.PartitionActionSetsByPayloadTypeJob;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest;
import eu.dnetlib.dhp.common.ThrowingSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.mutable.Seq;

@ExtendWith(value={MockitoExtension.class})
public class PartitionActionSetsByPayloadTypeJobTest {
    private static final ClassLoader cl = PartitionActionSetsByPayloadTypeJobTest.class.getClassLoader();
    private static Configuration configuration;
    private static SparkSession spark;
    private static final ObjectMapper OBJECT_MAPPER;
    private static final StructType ATOMIC_ACTION_SCHEMA;

    @BeforeAll
    public static void beforeAll() throws IOException {
        configuration = Job.getInstance().getConfiguration();
        SparkConf conf = new SparkConf();
        conf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
        conf.setMaster("local");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        spark = SparkSession.builder().config(conf).getOrCreate();
    }

    @AfterAll
    public static void afterAll() {
        spark.stop();
    }

    private List<String> resolveInputActionSetPaths(Path inputActionSetsBaseDir) throws IOException {
        Path inputActionSetJsonDumpsDir = PartitionActionSetsByPayloadTypeJobTest.getInputActionSetJsonDumpsDir();
        return Files.list(inputActionSetJsonDumpsDir).map(path -> {
            String inputActionSetId = path.getFileName().toString();
            return inputActionSetsBaseDir.resolve(inputActionSetId).toString();
        }).collect(Collectors.toCollection(ArrayList::new));
    }

    private static Map<String, List<String>> createActionSets(Path inputActionSetsDir) throws IOException {
        Path inputActionSetJsonDumpsDir = PartitionActionSetsByPayloadTypeJobTest.getInputActionSetJsonDumpsDir();
        HashMap<String, List<String>> oafsByType = new HashMap<String, List<String>>();
        Files.list(inputActionSetJsonDumpsDir).forEach(inputActionSetJsonDumpFile -> {
            String inputActionSetId = inputActionSetJsonDumpFile.getFileName().toString();
            Path inputActionSetDir = inputActionSetsDir.resolve(inputActionSetId);
            org.apache.spark.sql.Dataset actionDS = PartitionActionSetsByPayloadTypeJobTest.readActionsFromJsonDump(inputActionSetJsonDumpFile.toString()).cache();
            PartitionActionSetsByPayloadTypeJobTest.writeActionsAsJobInput((org.apache.spark.sql.Dataset<String>)actionDS, inputActionSetId, inputActionSetDir.toString());
            Map<String, List> actionSetOafsByType = actionDS.withColumn("atomic_action", functions.from_json((Column)functions.col((String)"value"), (StructType)ATOMIC_ACTION_SCHEMA)).select(new Column[]{functions.expr((String)"atomic_action.*")}).groupBy(new Column[]{functions.col((String)"clazz")}).agg(functions.collect_list((Column)functions.col((String)"payload")).as("payload_list"), new Column[0]).collectAsList().stream().map(row -> new AbstractMap.SimpleEntry<Object, List>(row.getAs("clazz"), JavaConversions.mutableSeqAsJavaList((Seq)((Seq)row.getAs("payload_list"))))).collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
            actionSetOafsByType.keySet().forEach(x -> {
                if (oafsByType.containsKey(x)) {
                    ArrayList collected = new ArrayList();
                    collected.addAll((Collection)oafsByType.get(x));
                    collected.addAll((Collection)actionSetOafsByType.get(x));
                    oafsByType.put((String)x, collected);
                } else {
                    oafsByType.put((String)x, (List<String>)actionSetOafsByType.get(x));
                }
            });
        });
        return oafsByType;
    }

    private static Path getInputActionSetJsonDumpsDir() {
        return Paths.get(Objects.requireNonNull(cl.getResource("eu/dnetlib/dhp/actionmanager/partition/input/")).getFile(), new String[0]);
    }

    private static org.apache.spark.sql.Dataset<String> readActionsFromJsonDump(String path) {
        return spark.read().textFile(path);
    }

    private static void writeActionsAsJobInput(org.apache.spark.sql.Dataset<String> actionDS, String inputActionSetId, String path) {
        actionDS.javaRDD().mapToPair((PairFunction & Serializable)json -> new Tuple2((Object)new Text(inputActionSetId), (Object)new Text(json))).saveAsNewAPIHadoopFile(path, Text.class, Text.class, SequenceFileOutputFormat.class, configuration);
    }

    private static <T extends Oaf> void assertForOafType(Path outputDir, Map<String, List<String>> oafsByClassName, Class<T> clazz) {
        Path outputDatasetDir = outputDir.resolve(String.format("clazz=%s", clazz.getCanonicalName()));
        Files.exists(outputDatasetDir, new LinkOption[0]);
        List actuals = PartitionActionSetsByPayloadTypeJobTest.readActionPayloadFromJobOutput(outputDatasetDir.toString(), clazz).collectAsList();
        actuals.sort(Comparator.comparingInt(Object::hashCode));
        List expecteds = oafsByClassName.get(clazz.getCanonicalName()).stream().map(json -> PartitionActionSetsByPayloadTypeJobTest.mapToOaf(json, clazz)).sorted(Comparator.comparingInt(Object::hashCode)).collect(Collectors.toList());
        Assertions.assertIterableEquals(expecteds, (Iterable)actuals);
    }

    private static <T extends Oaf> org.apache.spark.sql.Dataset<T> readActionPayloadFromJobOutput(String path, Class<T> clazz) {
        return spark.read().parquet(path).map((MapFunction & Serializable)value -> (Oaf)OBJECT_MAPPER.readValue((String)value.getAs("payload"), clazz), Encoders.bean(clazz));
    }

    private static <T extends Oaf> T mapToOaf(String json, Class<T> clazz) {
        return (T)((Oaf)ThrowingSupport.rethrowAsRuntimeException(() -> (Oaf)OBJECT_MAPPER.readValue(json, clazz), (String)String.format("failed to map json to class: json=%s, class=%s", json, clazz.getCanonicalName())));
    }

    static {
        OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        ATOMIC_ACTION_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())));
    }

    @DisplayName(value="Job")
    @Nested
    class Main {
        @Mock
        private ISClient isClient;

        Main() {
        }

        @Test
        void shouldPartitionActionSetsByPayloadType(@TempDir Path workingDir) throws Exception {
            Path inputActionSetsBaseDir = workingDir.resolve("input").resolve("action_sets");
            Path outputDir = workingDir.resolve("output");
            Map oafsByClassName = PartitionActionSetsByPayloadTypeJobTest.createActionSets(inputActionSetsBaseDir);
            List inputActionSetsPaths = PartitionActionSetsByPayloadTypeJobTest.this.resolveInputActionSetPaths(inputActionSetsBaseDir);
            Mockito.when((Object)this.isClient.getLatestRawsetPaths(Mockito.anyString())).thenReturn((Object)inputActionSetsPaths);
            PartitionActionSetsByPayloadTypeJob job = new PartitionActionSetsByPayloadTypeJob();
            job.setIsClient(this.isClient);
            job.run(Boolean.FALSE, "", outputDir.toString());
            Files.exists(outputDir, new LinkOption[0]);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Dataset.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Datasource.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Organization.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, OtherResearchProduct.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Project.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Publication.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Result.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Relation.class);
            PartitionActionSetsByPayloadTypeJobTest.assertForOafType(outputDir, oafsByClassName, Software.class);
        }
    }
}

