/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.migration;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.migration.MigrateMongoMdstoresApplication;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

public class ExtractEntitiesFromHDFSJob {
    private static final Log log = LogFactory.getLog(ExtractEntitiesFromHDFSJob.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/migration/extract_entities_from_hdfs_parameters.json")));
        parser.parseArgument(args);
        SparkSession spark = SparkSession.builder().appName(ExtractEntitiesFromHDFSJob.class.getSimpleName()).master(parser.get("master")).getOrCreate();
        try (JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());){
            List<String> sourcePaths = Arrays.stream(parser.get("sourcePaths").split(",")).filter(p -> ExtractEntitiesFromHDFSJob.exists(sc, p)).collect(Collectors.toList());
            String targetPath = parser.get("graphRawPath");
            ExtractEntitiesFromHDFSJob.processEntity(sc, Publication.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Dataset.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Software.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, OtherResearchProduct.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Datasource.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Organization.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Project.class, sourcePaths, targetPath);
            ExtractEntitiesFromHDFSJob.processEntity(sc, Relation.class, sourcePaths, targetPath);
        }
    }

    private static void processEntity(JavaSparkContext sc, Class<?> clazz, List<String> sourcePaths, String targetPath) {
        String type = clazz.getSimpleName().toLowerCase();
        log.info((Object)String.format("Processing entities (%s) in files:", type));
        sourcePaths.forEach(arg_0 -> ((Log)log).info(arg_0));
        JavaRDD inputRdd = sc.emptyRDD();
        for (String sp : sourcePaths) {
            inputRdd = inputRdd.union(sc.sequenceFile(sp, Text.class, Text.class).map((Function & Serializable)k -> new Tuple2((Object)((Text)k._1()).toString(), (Object)((Text)k._2()).toString())).filter((Function & Serializable)k -> ExtractEntitiesFromHDFSJob.isEntityType((String)k._1(), type)).map(Tuple2::_2));
        }
        inputRdd.saveAsTextFile(targetPath + "/" + type);
    }

    private static boolean isEntityType(String item, String type) {
        return StringUtils.substringAfter((String)item, (String)":").equalsIgnoreCase(type);
    }

    private static boolean exists(JavaSparkContext context, String pathToFile) {
        try {
            FileSystem hdfs = FileSystem.get((Configuration)context.hadoopConfiguration());
            Path path = new Path(pathToFile);
            return hdfs.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

