/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.util.MapDocumentUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkUpdateEntity
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
    private static final String IDJSONPATH = "$.id";

    public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkUpdateEntity(parser, SparkUpdateEntity.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws IOException {
        String graphBasePath = this.parser.get("graphBasePath");
        String workingPath = this.parser.get("workingPath");
        String dedupGraphPath = this.parser.get("dedupGraphPath");
        log.info("graphBasePath:  '{}'", (Object)graphBasePath);
        log.info("workingPath:    '{}'", (Object)workingPath);
        log.info("dedupGraphPath: '{}'", (Object)dedupGraphPath);
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)this.spark.sparkContext());
        ModelSupport.entityTypes.forEach((type, clazz) -> {
            String outputPath = dedupGraphPath + "/" + type;
            SparkUpdateEntity.removeOutputDir(this.spark, outputPath);
            JavaRDD sourceEntity = sc.textFile(DedupUtility.createEntityPath(graphBasePath, type.toString()));
            if (this.mergeRelExists(workingPath, type.toString())) {
                String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, "*", type.toString());
                String dedupRecordPath = DedupUtility.createDedupRecordPath(workingPath, "*", type.toString());
                Dataset rel = this.spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
                JavaPairRDD mergedIds = rel.where("relClass == 'merges'").select(new Column[]{rel.col("target")}).distinct().toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getString(0), (Object)"d"));
                JavaPairRDD entitiesWithId = sourceEntity.mapToPair((PairFunction & Serializable)s -> new Tuple2((Object)MapDocumentUtil.getJPathString((String)IDJSONPATH, (String)s), s));
                JavaRDD map = entitiesWithId.leftOuterJoin(mergedIds).map((Function & Serializable)k -> ((Optional)((Tuple2)k._2())._2()).isPresent() ? SparkUpdateEntity.updateDeletedByInference((String)((Tuple2)k._2())._1(), clazz) : (String)((Tuple2)k._2())._1());
                sourceEntity = map.union(sc.textFile(dedupRecordPath));
            }
            sourceEntity.saveAsTextFile(outputPath, GzipCodec.class);
        });
    }

    public boolean mergeRelExists(String basePath, String entity) {
        boolean result = false;
        try {
            FileStatus[] fileStatuses;
            FileSystem fileSystem = FileSystem.get((Configuration)new Configuration());
            for (FileStatus fs : fileStatuses = fileSystem.listStatus(new Path(basePath))) {
                if (!fs.isDirectory() || !fileSystem.exists(new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity)))) continue;
                result = true;
            }
            return result;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static <T extends OafEntity> String updateDeletedByInference(String json, Class<T> clazz) {
        try {
            Oaf entity = (Oaf)OBJECT_MAPPER.readValue(json, clazz);
            if (entity.getDataInfo() == null) {
                entity.setDataInfo(new DataInfo());
            }
            entity.getDataInfo().setDeletedbyinference(Boolean.valueOf(true));
            return OBJECT_MAPPER.writeValueAsString((Object)entity);
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to convert json", e);
        }
    }
}

