/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SparkUpdateEntity
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkUpdateEntity.class);
    private static final String IDJSONPATH = "$.id";

    public SparkUpdateEntity(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkUpdateEntity.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/updateEntity_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkUpdateEntity(parser, SparkUpdateEntity.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws IOException {
        String actionSetId = this.parser.get("actionSetId");
        String graphBasePath = this.parser.get("graphBasePath");
        String workingPath = this.parser.get("workingPath");
        String dedupGraphPath = this.parser.get("dedupGraphPath");
        log.info("actionSetId:  '{}'", (Object)actionSetId);
        log.info("graphBasePath:  '{}'", (Object)graphBasePath);
        log.info("workingPath:    '{}'", (Object)workingPath);
        log.info("dedupGraphPath: '{}'", (Object)dedupGraphPath);
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)this.spark.sparkContext());
        for (Map.Entry e : ModelSupport.entityTypes.entrySet()) {
            EntityType type = (EntityType)e.getKey();
            Class clazz = (Class)e.getValue();
            String outputPath = dedupGraphPath + "/" + String.valueOf(type);
            SparkUpdateEntity.removeOutputDir(this.spark, outputPath);
            String ip = DedupUtility.createEntityPath(graphBasePath, type.toString());
            if (!HdfsSupport.exists((String)ip, (Configuration)sc.hadoopConfiguration())) continue;
            Dataset sourceEntity = this.spark.read().text(DedupUtility.createEntityPath(graphBasePath, type.toString())).as(Encoders.STRING());
            if (this.mergeRelExists(sc, workingPath, type.toString())) {
                String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, type.toString());
                String dedupRecordPath = DedupUtility.createDedupRecordPath(workingPath, actionSetId, type.toString());
                Dataset rel = this.spark.read().load(mergeRelPath).as(Encoders.bean(Relation.class));
                Dataset mergedIds = rel.where("relClass == 'merges'").where("source != target").select(new Column[]{rel.col("target")}).distinct().selectExpr(new String[]{"target as id", "TRUE as merged"});
                Dataset entitiesWithId = sourceEntity.selectExpr(new String[]{"get_json_object(value, '$.id') as id", "value"});
                if (type == EntityType.organization) {
                    Dataset roots = rel.where("relClass == 'merges'").selectExpr(new String[]{"source as id"}).distinct();
                    entitiesWithId = entitiesWithId.join(roots, DHPUtils.toSeq(Collections.singletonList("id")).toSeq(), "left_anti");
                }
                Dataset map = entitiesWithId.join(mergedIds, DHPUtils.toSeq(Collections.singletonList("id")).toSeq(), "left").map((MapFunction & Serializable)row -> {
                    if (!row.isNullAt(row.fieldIndex("merged")) && ((Boolean)row.getAs("merged")).booleanValue()) {
                        return SparkUpdateEntity.updateDeletedByInference((String)row.getAs("value"), clazz);
                    }
                    return (String)row.getAs("value");
                }, Encoders.STRING());
                sourceEntity = map.union(this.spark.read().text(dedupRecordPath).as(Encoders.STRING()));
            }
            SparkUpdateEntity.saveText(sourceEntity, outputPath, SaveMode.Overwrite);
        }
    }

    public boolean mergeRelExists(JavaSparkContext sc, String basePath, String entity) throws IOException {
        FileStatus[] fileStatuses;
        boolean result = false;
        Path p = new Path(basePath);
        FileSystem fileSystem = p.getFileSystem(sc.hadoopConfiguration());
        for (FileStatus fs : fileStatuses = fileSystem.listStatus(p)) {
            Path mergeRelPath = new Path(DedupUtility.createMergeRelPath(basePath, fs.getPath().getName(), entity));
            if (!fs.isDirectory() || !fileSystem.exists(mergeRelPath)) continue;
            result = true;
        }
        return result;
    }

    private static <T extends OafEntity> String updateDeletedByInference(String json, Class<T> clazz) {
        try {
            Oaf entity = (Oaf)OBJECT_MAPPER.readValue(json, clazz);
            if (entity.getDataInfo() == null) {
                entity.setDataInfo(new DataInfo());
            }
            entity.getDataInfo().setDeletedbyinference(Boolean.valueOf(true));
            return OBJECT_MAPPER.writeValueAsString((Object)entity);
        }
        catch (IOException e) {
            throw new RuntimeException("Unable to convert json", e);
        }
    }
}

