/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Serializable;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
@ExtendWith(value={MockitoExtension.class})
public class GenerateNativeStoreSparkJobTest
extends AbstractVocabularyTest {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static SparkSession spark;
    private static Path workingDir;
    private static Encoder<MetadataRecord> encoder;
    private static final String encoding = "XML";
    private static final String dateOfCollection;
    private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
    private static String provenance;
    private static final Logger log;

    @BeforeAll
    public static void beforeAll() throws IOException {
        provenance = IOUtils.toString((InputStream)GenerateNativeStoreSparkJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
        workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName(), new FileAttribute[0]);
        log.info("using work dir {}", (Object)workingDir);
        SparkConf conf = new SparkConf();
        conf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
        conf.setMaster("local[*]");
        conf.set("spark.driver.host", "localhost");
        conf.set("hive.metastore.local", "true");
        conf.set("spark.ui.enabled", "false");
        conf.set("spark.sql.warehouse.dir", workingDir.toString());
        conf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
        encoder = Encoders.bean(MetadataRecord.class);
        spark = SparkSession.builder().appName(GenerateNativeStoreSparkJobTest.class.getSimpleName()).config(conf).getOrCreate();
    }

    @AfterAll
    public static void afterAll() throws IOException {
        FileUtils.deleteDirectory((File)workingDir.toFile());
        spark.stop();
    }

    @Test
    @Order(value=1)
    void testGenerateNativeStoreSparkJobRefresh() throws Exception {
        MDStoreVersion mdStoreV1 = this.prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
        FileUtils.forceMkdir((File)new File(mdStoreV1.getHdfsPath()));
        IOUtils.copy((InputStream)this.getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), (OutputStream)new FileOutputStream(mdStoreV1.getHdfsPath() + "/sequence_file"));
        String apiDescriptor = IOUtils.toString((InputStream)Objects.requireNonNull(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/apiDescriptor.json")), (Charset)Charsets.UTF_8);
        GenerateNativeStoreSparkJob.main((String[])new String[]{"--isSparkSessionManaged", Boolean.FALSE.toString(), "--encoding", encoding, "--dateOfCollection", dateOfCollection, "--provenance", provenance, "--apidescriptor", apiDescriptor, "--xpath", xpath, "--mdStoreVersion", OBJECT_MAPPER.writeValueAsString((Object)mdStoreV1), "--readMdStoreVersion", "", "--workflowId", "abc"});
        this.verify(mdStoreV1);
    }

    @Test
    @Order(value=2)
    void testGenerateNativeStoreSparkJobIncremental() throws Exception {
        MDStoreVersion mdStoreV2 = this.prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
        FileUtils.forceMkdir((File)new File(mdStoreV2.getHdfsPath()));
        IOUtils.copy((InputStream)this.getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), (OutputStream)new FileOutputStream(mdStoreV2.getHdfsPath() + "/sequence_file"));
        String apiDescriptor = IOUtils.toString((InputStream)Objects.requireNonNull(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/apiDescriptor.json")), (Charset)Charsets.UTF_8);
        MDStoreVersion mdStoreV1 = this.prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
        GenerateNativeStoreSparkJob.main((String[])new String[]{"--isSparkSessionManaged", Boolean.FALSE.toString(), "--encoding", encoding, "--dateOfCollection", dateOfCollection, "--provenance", provenance, "--apidescriptor", apiDescriptor, "--xpath", xpath, "--mdStoreVersion", OBJECT_MAPPER.writeValueAsString((Object)mdStoreV2), "--readMdStoreVersion", OBJECT_MAPPER.writeValueAsString((Object)mdStoreV1), "--workflowId", "abc"});
        this.verify(mdStoreV2);
    }

    @Test
    @Order(value=3)
    void testTransformSparkJob() throws Exception {
        this.setUpVocabulary();
        MDStoreVersion mdStoreV2 = this.prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
        MDStoreVersion mdStoreCleanedVersion = this.prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
        this.mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
        Map<String, String> parameters = Stream.of({"dateOfTransformation", "1234"}, {"transformationPlugin", "XSLT_TRANSFORM"}, {"transformationRuleId", "simpleTRule"}).collect(Collectors.toMap(data -> data[0], data -> data[1]));
        TransformSparkJobNode.transformRecords(parameters, (ISLookUpService)this.isLookUpService, (SparkSession)spark, (String)(mdStoreV2.getHdfsPath() + "/store"), (String)mdStoreCleanedVersion.getHdfsPath(), (Integer)200);
        Encoder encoder = Encoders.bean(MetadataRecord.class);
        Dataset mOutput = spark.read().format("parquet").load(mdStoreCleanedVersion.getHdfsPath() + "/store").as(encoder);
        Long total = mOutput.count();
        long recordTs = mOutput.filter((FilterFunction & Serializable)p -> p.getDateOfTransformation() == 1234L).count();
        long recordNotEmpty = mOutput.filter((FilterFunction & Serializable)p -> !StringUtils.isBlank((CharSequence)p.getBody())).count();
        Assertions.assertEquals((Long)total, (long)recordTs);
        Assertions.assertEquals((Long)total, (long)recordNotEmpty);
    }

    @Test
    void testJSONSerialization() throws Exception {
        String s = IOUtils.toString((InputStream)this.getClass().getResourceAsStream("mdStoreVersion_1.json"));
        System.out.println("s = " + s);
        ObjectMapper mapper = new ObjectMapper();
        MDStoreVersion mi = (MDStoreVersion)mapper.readValue(s, MDStoreVersion.class);
        Assertions.assertNotNull((Object)mi);
    }

    @Test
    void testGenerationMetadataRecord() throws Exception {
        String xml = IOUtils.toString((InputStream)this.getClass().getResourceAsStream("./record.xml"));
        MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord((String)xml, (String)"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", (String)encoding, (Provenance)new Provenance("foo", "bar", "ns_prefix"), (Long)System.currentTimeMillis(), null, null);
        Assertions.assertNotNull((Object)record.getId());
        Assertions.assertNotNull((Object)record.getOriginalId());
    }

    @Test
    void testEquals() throws IOException {
        String xml = IOUtils.toString((InputStream)this.getClass().getResourceAsStream("./record.xml"));
        MetadataRecord record = GenerateNativeStoreSparkJob.parseRecord((String)xml, (String)"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", (String)encoding, (Provenance)new Provenance("foo", "bar", "ns_prefix"), (Long)System.currentTimeMillis(), null, null);
        MetadataRecord record1 = GenerateNativeStoreSparkJob.parseRecord((String)xml, (String)"./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", (String)encoding, (Provenance)new Provenance("foo", "bar", "ns_prefix"), (Long)System.currentTimeMillis(), null, null);
        record.setBody("ciao");
        record1.setBody("mondo");
        Assertions.assertNotNull((Object)record);
        Assertions.assertNotNull((Object)record1);
        Assertions.assertEquals((Object)record, (Object)record1);
    }

    protected void verify(MDStoreVersion mdStoreVersion) throws IOException {
        Assertions.assertTrue((boolean)new File(mdStoreVersion.getHdfsPath()).exists());
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        long seqFileSize = sc.sequenceFile(mdStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class).count();
        Dataset mdstore = spark.read().load(mdStoreVersion.getHdfsPath() + "/store").as(encoder);
        long mdStoreSize = mdstore.count();
        long declaredSize = Long.parseLong(IOUtils.toString((Reader)new FileReader(mdStoreVersion.getHdfsPath() + "/size")));
        Assertions.assertEquals((long)seqFileSize, (long)declaredSize, (String)"the size must be equal");
        Assertions.assertEquals((long)seqFileSize, (long)mdStoreSize, (String)"the size must be equal");
        long uniqueIds = mdstore.map(MetadataRecord::getId, Encoders.STRING()).distinct().count();
        Assertions.assertEquals((long)seqFileSize, (long)uniqueIds, (String)"the size must be equal");
    }

    public MDStoreVersion prepareVersion(String filename) throws IOException {
        MDStoreVersion mdstore = (MDStoreVersion)OBJECT_MAPPER.readValue(IOUtils.toString((URL)this.getClass().getResource(filename)), MDStoreVersion.class);
        mdstore.setHdfsPath(String.format(mdstore.getHdfsPath(), workingDir.toString()));
        return mdstore;
    }

    static {
        dateOfCollection = "" + System.currentTimeMillis();
        log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class);
    }
}

