/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.OpenorgsUtility;
import eu.dnetlib.dhp.oa.dedup.SparkCopyOpenorgsSimRels;
import eu.dnetlib.dhp.oa.dedup.SparkCreateMergeRels;
import eu.dnetlib.dhp.oa.dedup.SparkCreateSimRels;
import eu.dnetlib.dhp.oa.dedup.SparkDedupTest;
import eu.dnetlib.dhp.oa.dedup.SparkPrepareNewOrgs;
import eu.dnetlib.dhp.oa.dedup.SparkPrepareOrgRels;
import eu.dnetlib.dhp.oa.dedup.SparkRefineMergeRels;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(value={MockitoExtension.class})
@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
public class SparkOpenorgsDedupTest
implements Serializable {
    private static final String dbUrl = "jdbc:h2:mem:openorgs_test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false";
    private static final String dbUser = "sa";
    private static final String dedupEventsTable = "tmp_dedup_events";
    private static final String parentChildTable = "tmp_parent_child";
    private static final String dbPwd = "";
    @Mock(serializable=true)
    ISLookUpService isLookUpService;
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    private static SparkSession spark;
    private static JavaSparkContext jsc;
    private static String testGraphBasePath;
    private static String testOutputBasePath;
    private static String testDedupGraphBasePath;
    private static final String testActionSetId = "test-orchestrator-openorgs";

    @BeforeAll
    public static void cleanUp() throws IOException, URISyntaxException {
        testGraphBasePath = Paths.get(SparkOpenorgsDedupTest.class.getResource("/eu/dnetlib/dhp/dedup/openorgs/dedup").toURI()).toFile().getAbsolutePath();
        testOutputBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        testDedupGraphBasePath = Files.createTempDirectory(SparkDedupTest.class.getSimpleName() + "-", new FileAttribute[0]).toAbsolutePath().toString();
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
        SparkConf conf = new SparkConf();
        conf.set("spark.sql.shuffle.partitions", "200");
        spark = SparkSession.builder().appName(SparkDedupTest.class.getSimpleName()).master("local[*]").config(conf).getOrCreate();
        jsc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
    }

    @BeforeEach
    public void setUp() throws IOException, ISLookUpException {
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)testActionSetId))).thenReturn((Object)IOUtils.toString((InputStream)SparkOpenorgsDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/profiles/mock_orchestrator_openorgs.xml")));
        Mockito.lenient().when((Object)this.isLookUpService.getResourceProfileByQuery(Mockito.contains((String)"organization"))).thenReturn((Object)IOUtils.toString((InputStream)SparkOpenorgsDedupTest.class.getResourceAsStream("/eu/dnetlib/dhp/dedup/conf/org.curr.conf.json")));
    }

    @Test
    @Order(value=1)
    void createSimRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createSimRels_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-np", "50"});
        new SparkCreateSimRels(parser, spark).run(this.isLookUpService);
        long orgs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"organization")).count();
        System.out.println("orgs_simrel = " + orgs_simrel);
        Assertions.assertEquals((long)95L, (long)orgs_simrel);
    }

    @Test
    @Order(value=2)
    void copyOpenorgsSimRels() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCopyOpenorgsSimRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/copyOpenorgsMergeRels_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-w", testOutputBasePath, "-la", "lookupurl", "-np", "50"});
        new SparkCopyOpenorgsSimRels(parser, spark).run(this.isLookUpService);
        long orgs_simrel = spark.read().load(DedupUtility.createSimRelPath((String)testOutputBasePath, (String)testActionSetId, (String)"organization")).count();
        System.out.println("orgs_simrel = " + orgs_simrel);
        Assertions.assertEquals((long)131L, (long)orgs_simrel);
    }

    @Test
    @Order(value=3)
    void createMergeRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-h", dbPwd});
        new SparkCreateMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count();
        Assertions.assertEquals((long)132L, (long)orgs_mergerel);
        long orgs_dedup_ids = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").select(new Column[]{functions.col((String)"source")}).distinct().count();
        Assertions.assertEquals((long)72L, (long)orgs_dedup_ids);
        List diffRels = jsc.textFile(DedupUtility.createEntityPath((String)testGraphBasePath, (String)"relation")).map((Function & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class)).filter((Function & Serializable)r -> r.getRelClass().equals("isDifferentFrom")).map((Function & Serializable)r -> r.getTarget()).collect();
        Assertions.assertEquals((int)18, (int)diffRels.size());
        List mergeRels = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").as(Encoders.bean(Relation.class)).toJavaRDD().map((Function & Serializable)r -> r.getTarget()).collect();
        Assertions.assertFalse((boolean)Collections.disjoint(mergeRels, diffRels));
    }

    @Test
    @Order(value=4)
    void refineMergeRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkRefineMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/refineMergeRels_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath});
        new SparkRefineMergeRels(parser, spark).run(this.isLookUpService);
        long orgs_mergerel = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").count();
        Assertions.assertEquals((long)132L, (long)orgs_mergerel);
        long orgs_dedup_ids = spark.read().load(testOutputBasePath + "/" + testActionSetId + "/organization_mergerel").select(new Column[]{functions.col((String)"source")}).distinct().count();
        Assertions.assertEquals((long)75L, (long)orgs_dedup_ids);
    }

    @Test
    @Order(value=5)
    void prepareOrgRelsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPrepareOrgRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-du", dbUrl, "-dusr", dbUser, "-tde", dedupEventsTable, "-tpc", parentChildTable, "-dpwd", dbPwd});
        new SparkPrepareOrgRels(parser, spark).run(this.isLookUpService);
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPwd);
        Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
        ResultSet resultSet = connection.prepareStatement("SELECT COUNT(*) as total_rels FROM tmp_dedup_events").executeQuery();
        if (resultSet.next()) {
            int total_rels = resultSet.getInt("total_rels");
            Assertions.assertEquals((int)34, (int)total_rels);
        } else {
            Assertions.fail((String)"No result in the sql DB");
        }
        resultSet.close();
        ResultSet resultSet2 = connection.prepareStatement("SELECT COUNT(DISTINCT(local_id)) as total_orgs FROM tmp_dedup_events").executeQuery();
        if (resultSet2.next()) {
            int total_orgs = resultSet2.getInt("total_orgs");
            Assertions.assertEquals((int)8, (int)total_orgs);
        } else {
            Assertions.fail((String)"No result in the sql DB");
        }
        resultSet2.close();
        List diffRels = jsc.textFile(DedupUtility.createEntityPath((String)testGraphBasePath, (String)"relation")).map((Function & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class)).filter((Function & Serializable)r -> r.getRelClass().equals("isDifferentFrom")).map((Function & Serializable)r -> r.getSource() + "@@@" + r.getTarget()).collect();
        Dataset families = OpenorgsUtility.createFamilies((SparkSession)spark, (String)DedupUtility.createEntityPath((String)testGraphBasePath, (String)"relation"), (String)"IsParentOf");
        ArrayList<Row> rows = new ArrayList<Row>();
        ResultSet resultSet3 = connection.prepareStatement("SELECT local_id, oa_original_id FROM tmp_dedup_events").executeQuery();
        while (resultSet3.next()) {
            String source = OafMapperUtils.createOpenaireId((String)"organization", (String)resultSet3.getString("local_id"), (boolean)true);
            String target = OafMapperUtils.createOpenaireId((String)"organization", (String)resultSet3.getString("oa_original_id"), (boolean)true);
            rows.add(RowFactory.create((Object[])new Object[]{source, target}));
        }
        resultSet3.close();
        Dataset duplicateSuggestions = spark.createDataFrame(rows, new StructType().add("source", DataTypes.StringType, false).add("target", DataTypes.StringType, false));
        List dbRels = duplicateSuggestions.toJavaRDD().map((Function & Serializable)r -> r.get(0) + "@@@" + r.get(1)).collect();
        Assertions.assertTrue((boolean)Collections.disjoint(dbRels, diffRels));
        long family_conflicts_num = duplicateSuggestions.join(families, duplicateSuggestions.col("target").equalTo((Object)families.col("id"))).groupBy(new Column[]{duplicateSuggestions.col("source")}).agg(functions.count((String)"groupId").alias("total"), new Column[]{functions.countDistinct((String)"groupId", (String[])new String[0]).alias("distinct")}).filter(functions.col((String)"total").gt((Object)functions.col((String)"distinct"))).count();
        Assertions.assertEquals((long)0L, (long)family_conflicts_num);
        ResultSet resultSet4 = connection.prepareStatement("SELECT COUNT(*) as total_rels FROM tmp_parent_child").executeQuery();
        if (resultSet4.next()) {
            int total_rels = resultSet4.getInt("total_rels");
            Assertions.assertEquals((int)4, (int)total_rels);
        } else {
            Assertions.fail((String)"No result in the sql DB");
        }
        resultSet.close();
        connection.close();
    }

    @Test
    @Order(value=6)
    void prepareNewOrgsTest() throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPrepareNewOrgs.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
        parser.parseArgument(new String[]{"-i", testGraphBasePath, "-asi", testActionSetId, "-la", "lookupurl", "-w", testOutputBasePath, "-du", dbUrl, "-dusr", dbUser, "-tde", dedupEventsTable, "-dpwd", dbPwd});
        new SparkPrepareNewOrgs(parser, spark).run(this.isLookUpService);
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPwd);
        long orgs_in_diffrel = jsc.textFile(DedupUtility.createEntityPath((String)testGraphBasePath, (String)"relation")).map((Function & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class)).filter((Function & Serializable)r -> r.getRelClass().equals("isDifferentFrom")).map((Function & Serializable)r -> r.getTarget()).distinct().count();
        Connection connection = DriverManager.getConnection(dbUrl, connectionProperties);
        jsc.textFile(DedupUtility.createEntityPath((String)testGraphBasePath, (String)"relation")).map((Function & Serializable)s -> (Relation)OBJECT_MAPPER.readValue(s, Relation.class)).filter((Function & Serializable)r -> r.getRelClass().equals("isDifferentFrom")).map((Function & Serializable)r -> r.getTarget()).distinct().foreach((VoidFunction & Serializable)s -> System.out.println("difforgs = " + s));
        ResultSet resultSet0 = connection.prepareStatement("SELECT oa_original_id FROM tmp_dedup_events WHERE local_id = ''").executeQuery();
        while (resultSet0.next()) {
            System.out.println("dborgs = " + OafMapperUtils.createOpenaireId((int)20, (String)resultSet0.getString("oa_original_id"), (boolean)true));
        }
        resultSet0.close();
        ResultSet resultSet = connection.prepareStatement("SELECT COUNT(*) as total_new_orgs FROM tmp_dedup_events WHERE local_id = ''").executeQuery();
        if (resultSet.next()) {
            int total_new_orgs = resultSet.getInt("total_new_orgs");
            Assertions.assertEquals((long)orgs_in_diffrel, (long)total_new_orgs);
        } else {
            Assertions.fail((String)"No result in the sql DB");
        }
        resultSet.close();
    }

    @AfterAll
    public static void finalCleanUp() throws IOException {
        FileUtils.deleteDirectory((File)new File(testOutputBasePath));
        FileUtils.deleteDirectory((File)new File(testDedupGraphBasePath));
    }
}

