/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCreateDedupRecord;
import eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.oa.dedup.SparkUpdateEntity;
import eu.dnetlib.dhp.schema.oaf.Instance;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.OpenAccessRoute;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public class SparkPublicationRootsTest
implements Serializable {
    @Mock(serializable=true)
    ISLookUpService isLookUpService;
    private static SparkSession spark;
    private static String workingPath;
    private static String graphInputPath;
    private static String graphOutputPath;
    private static final String testActionSetId = "test-orchestrator";
    private static Path testBaseTmpPath;
    private static final ObjectMapper MAPPER;

    @BeforeAll
    public static void init() throws IOException, URISyntaxException {
        testBaseTmpPath = Files.createTempDirectory(SparkPublicationRootsTest.class.getSimpleName() + "-", new FileAttribute[0]);
        File entitiesSources = Paths.get(SparkPublicationRootsTest.class.getResource("/eu/dnetlib/dhp/dedup/root").toURI()).toFile();
        FileUtils.copyDirectory((File)entitiesSources, (File)testBaseTmpPath.resolve("input").toFile());
        workingPath = testBaseTmpPath.resolve("workingPath").toString();
        graphInputPath = testBaseTmpPath.resolve("input").resolve("entities").toString();
        graphOutputPath = testBaseTmpPath.resolve("output").toString();
        FileUtils.deleteDirectory((File)new File(workingPath));
        FileUtils.deleteDirectory((File)new File(graphOutputPath));
        SparkConf conf = new SparkConf();
        conf.set("spark.sql.shuffle.partitions", "10");
        spark = SparkSession.builder().appName(SparkPublicationRootsTest.class.getSimpleName()).master("local[*]").config(conf).getOrCreate();
    }

    @BeforeEach
    public void setUp() throws IOException, ISLookUpException {
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)testActionSetId))).thenReturn((Object)SparkPublicationRootsTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_publication.xml"));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"publication"))).thenReturn((Object)SparkPublicationRootsTest.classPathResourceAsString("/eu/dnetlib/dhp/dedup/conf/pub.curr.conf.json"));
    }

    @AfterAll
    public static void tearDown() throws IOException {
        FileUtils.deleteDirectory((File)testBaseTmpPath.toFile());
        spark.close();
    }

    @Test
    @Order(value=1)
    void createSimRelsTest() throws Exception {
        new SparkCreateSimRels(this.args("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json", new String[]{"--graphBasePath", graphInputPath, "--actionSetId", testActionSetId, "--isLookUpUrl", "lookupurl", "--workingPath", workingPath, "--numPartitions", "5"}), spark).run(this.isLookUpService);
        long pubs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)workingPath, (String)testActionSetId, (String)"publication")).count();
        Assertions.assertEquals((long)9L, (long)pubs_simrel);
    }

    @Test
    @Order(value=2)
    void cutMergeRelsTest() throws Exception {
        new SparkCreateMergeRels(this.args("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", new String[]{"--graphBasePath", graphInputPath, "--actionSetId", testActionSetId, "--isLookUpUrl", "lookupurl", "--workingPath", workingPath, "--cutConnectedComponent", "3", "-h", ""}), spark).run(this.isLookUpService);
        long pubs_mergerel = spark.read().load(workingPath + "/test-orchestrator/publication_mergerel").as(Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r.getRelClass().equalsIgnoreCase("merges")).groupBy("source", new String[0]).agg(functions.count((String)"target").alias("cnt"), new Column[0]).select("source", new String[]{"cnt"}).where("cnt > 3").count();
        Assertions.assertEquals((long)0L, (long)pubs_mergerel);
        FileUtils.deleteDirectory((File)new File(workingPath + "/test-orchestrator/publication_mergerel"));
    }

    @Test
    @Order(value=3)
    void createMergeRelsTest() throws Exception {
        new SparkCreateMergeRels(this.args("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json", new String[]{"--graphBasePath", graphInputPath, "--actionSetId", testActionSetId, "--isLookUpUrl", "lookupurl", "--workingPath", workingPath, "-h", ""}), spark).run(this.isLookUpService);
        Dataset merges = spark.read().load(workingPath + "/test-orchestrator/publication_mergerel").as(Encoders.bean(Relation.class));
        List mergeList = merges.filter("source == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)3, (int)mergeList.size());
        HashSet dups = Sets.newHashSet((Object[])new String[]{"50|doi_________::3b1d0d8e8f930826665df9d6b82fbb73", "50|doi_________::d5021b53204e4fdeab6ff5d5bc468032", "50|arXiv_______::c93aeb433eb90ed7a86e29be00791b7c"});
        mergeList.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"merges", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getTarget()));
        });
        List mergedIn = merges.filter("target == '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList();
        Assertions.assertEquals((int)3, (int)mergedIn.size());
        mergedIn.forEach(r -> {
            Assertions.assertEquals((Object)"resultResult", (Object)r.getRelType());
            Assertions.assertEquals((Object)"dedup", (Object)r.getSubRelType());
            Assertions.assertEquals((Object)"isMergedIn", (Object)r.getRelClass());
            Assertions.assertTrue((boolean)dups.contains(r.getSource()));
        });
        Assertions.assertEquals((long)26L, (long)merges.count());
    }

    @Test
    @Order(value=4)
    void createDedupRecordTest() throws Exception {
        new SparkCreateDedupRecord(this.args("/eu/dnetlib/dhp/oa/dedup/createDedupRecord_parameters.json", new String[]{"--graphBasePath", graphInputPath, "--actionSetId", testActionSetId, "--isLookUpUrl", "lookupurl", "--workingPath", workingPath}), spark).run(this.isLookUpService);
        Dataset roots = spark.read().textFile(workingPath + "/test-orchestrator/publication_deduprecord").map(SparkPublicationRootsTest.asEntity(Publication.class), Encoders.bean(Publication.class));
        Assertions.assertEquals((long)4L, (long)roots.count());
        Dataset pubs = spark.read().textFile(DedupUtility.createEntityPath((String)graphInputPath, (String)"publication")).map(SparkPublicationRootsTest.asEntity(Publication.class), Encoders.bean(Publication.class));
        SparkPublicationRootsTest.verifyRoot_case_1((Dataset<Publication>)roots, (Dataset<Publication>)pubs);
        this.verifyRoot_case_2((Dataset<Publication>)roots, (Dataset<Publication>)pubs);
        this.verifyRoot_case_3((Dataset<Publication>)roots, (Dataset<Publication>)pubs);
    }

    private static void verifyRoot_case_1(Dataset<Publication> roots, Dataset<Publication> pubs) {
        Publication root = (Publication)roots.filter("id = '50|doi_dedup___::d5021b53204e4fdeab6ff5d5bc468032'").first();
        Assertions.assertNotNull((Object)root);
        Publication crossref_duplicate = (Publication)pubs.filter("id = '50|doi_________::d5021b53204e4fdeab6ff5d5bc468032'").collectAsList().get(0);
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getName(), (Object)root.getJournal().getName());
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getIssnPrinted(), (Object)root.getJournal().getIssnPrinted());
        Assertions.assertEquals((Object)crossref_duplicate.getPublisher().getValue(), (Object)root.getPublisher().getValue());
        Set rootPids = root.getPid().stream().map(StructuredProperty::getValue).collect(Collectors.toCollection(HashSet::new));
        Set dupPids = crossref_duplicate.getPid().stream().map(StructuredProperty::getValue).collect(Collectors.toCollection(HashSet::new));
        Assertions.assertFalse((boolean)Sets.intersection((Set)rootPids, (Set)dupPids).isEmpty());
        Assertions.assertTrue((boolean)rootPids.contains("10.1109/jstqe.2022.3205716"));
        Optional<Instance> instance_cr = root.getInstance().stream().filter(i -> i.getCollectedfrom().getValue().equals("Crossref")).findFirst();
        Assertions.assertTrue((boolean)instance_cr.isPresent());
        Assertions.assertEquals((Object)"OPEN", (Object)instance_cr.get().getAccessright().getClassid());
        Assertions.assertEquals((Object)"Open Access", (Object)instance_cr.get().getAccessright().getClassname());
        Assertions.assertEquals((Object)OpenAccessRoute.hybrid, (Object)instance_cr.get().getAccessright().getOpenAccessRoute());
        Assertions.assertEquals((Object)"IEEE Journal of Selected Topics in Quantum Electronics", (Object)instance_cr.get().getHostedby().getValue());
        Assertions.assertEquals((Object)"0001", (Object)instance_cr.get().getInstancetype().getClassid());
        Assertions.assertEquals((Object)"Article", (Object)instance_cr.get().getInstancetype().getClassname());
    }

    private void verifyRoot_case_2(Dataset<Publication> roots, Dataset<Publication> pubs) {
        Publication root = (Publication)roots.filter("id = '50|doi_dedup___::18aff3b55fb6876466a5d4bd82434885'").first();
        Assertions.assertNotNull((Object)root);
        Publication crossref_duplicate = (Publication)pubs.filter("id = '50|doi_________::18aff3b55fb6876466a5d4bd82434885'").first();
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getName(), (Object)root.getJournal().getName());
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getIssnOnline(), (Object)root.getJournal().getIssnOnline());
        Assertions.assertEquals((Object)crossref_duplicate.getJournal().getVol(), (Object)root.getJournal().getVol());
        Assertions.assertEquals((Object)crossref_duplicate.getPublisher().getValue(), (Object)root.getPublisher().getValue());
        Set dups_cf = pubs.collectAsList().stream().flatMap(p -> p.getCollectedfrom().stream()).map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new));
        Set root_cf = root.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new));
        Assertions.assertTrue((boolean)Sets.difference((Set)root_cf, (Set)dups_cf).isEmpty());
    }

    private void verifyRoot_case_3(Dataset<Publication> roots, Dataset<Publication> pubs) {
        Publication root = (Publication)roots.filter("id = '50|dedup_wf_002::7143f4ff5708f3657db0b7e68ea74d55'").first();
        Assertions.assertNotNull((Object)root);
        Publication pivot_duplicate = (Publication)pubs.filter("id = '50|od_______166::31ca734cc22181b704c4aa8fd050062a'").first();
        Assertions.assertEquals((Object)pivot_duplicate.getPublisher().getValue(), (Object)root.getPublisher().getValue());
        Set dups_cf = pubs.collectAsList().stream().flatMap(p -> p.getCollectedfrom().stream()).map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new));
        Set root_cf = root.getCollectedfrom().stream().map(KeyValue::getValue).collect(Collectors.toCollection(HashSet::new));
        Assertions.assertTrue((boolean)Sets.difference((Set)root_cf, (Set)dups_cf).isEmpty());
    }

    @Test
    @Order(value=6)
    void updateEntityTest() throws Exception {
        new SparkUpdateEntity(this.args("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json", new String[]{"--graphBasePath", graphInputPath, "--workingPath", workingPath, "--dedupGraphPath", graphOutputPath}), spark).run(this.isLookUpService);
        long publications = spark.read().textFile(graphOutputPath + "/publication").count();
        long mergedPubs = spark.read().load(workingPath + "/test-orchestrator/publication_mergerel").as(Encoders.bean(Relation.class)).where("relClass=='merges'").map(Relation::getTarget, Encoders.STRING()).distinct().count();
        Assertions.assertEquals((long)20L, (long)publications);
        long deletedPubs = spark.read().textFile(graphOutputPath + "/publication").map(SparkPublicationRootsTest.asEntity(Publication.class), Encoders.bean(Publication.class)).filter("datainfo.deletedbyinference == true").map(OafEntity::getId, Encoders.STRING()).distinct().count();
    }

    private static String classPathResourceAsString(String path) throws IOException {
        return IOUtils.toString((InputStream)SparkPublicationRootsTest.class.getResourceAsStream(path));
    }

    private static <T extends OafEntity> MapFunction<String, T> asEntity(Class<T> clazz) {
        return (MapFunction & Serializable)value -> (OafEntity)MAPPER.readValue(value, clazz);
    }

    private ArgumentApplicationParser args(String paramSpecs, String[] args) throws IOException, ParseException {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(SparkPublicationRootsTest.classPathResourceAsString(paramSpecs));
        parser.parseArgument(args);
        return parser;
    }

    static {
        MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    }
}

