/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class GenerateNativeStoreSparkJob {
    private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)GenerateNativeStoreSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/collection/generate_native_input_parameters.json")));
        parser.parseArgument(args);
        String provenanceArgument = parser.get("provenance");
        log.info("Provenance is {}", (Object)provenanceArgument);
        Provenance provenance = (Provenance)DHPUtils.MAPPER.readValue(provenanceArgument, Provenance.class);
        String dateOfCollectionArgs = parser.get("dateOfCollection");
        log.info("dateOfCollection is {}", (Object)dateOfCollectionArgs);
        Long dateOfCollection = new Long(dateOfCollectionArgs);
        String mdStoreVersion = parser.get("mdStoreVersion");
        log.info("mdStoreVersion is {}", (Object)mdStoreVersion);
        MDStoreVersion currentVersion = (MDStoreVersion)DHPUtils.MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
        String readMdStoreVersionParam = parser.get("readMdStoreVersion");
        log.info("readMdStoreVersion is {}", (Object)readMdStoreVersionParam);
        MDStoreVersion readMdStoreVersion = StringUtils.isBlank((CharSequence)readMdStoreVersionParam) ? null : (MDStoreVersion)DHPUtils.MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
        String xpath = parser.get("xpath");
        log.info("xpath is {}", (Object)xpath);
        String encoding = parser.get("encoding");
        log.info("encoding is {}", (Object)encoding);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> GenerateNativeStoreSparkJob.createNativeMDStore(spark, provenance, dateOfCollection, xpath, encoding, currentVersion, readMdStoreVersion));
    }

    private static void createNativeMDStore(SparkSession spark, Provenance provenance, Long dateOfCollection, String xpath, String encoding, MDStoreVersion currentVersion, MDStoreVersion readVersion) throws IOException {
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
        LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
        String seqFilePath = currentVersion.getHdfsPath() + "/sequence_file";
        JavaRDD nativeStore = sc.sequenceFile(seqFilePath, IntWritable.class, Text.class).map((Function & Serializable)item -> GenerateNativeStoreSparkJob.parseRecord(((Text)item._2()).toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords)).filter(Objects::nonNull).distinct();
        Encoder encoder = Encoders.bean(MetadataRecord.class);
        Dataset mdstore = spark.createDataset(nativeStore.rdd(), encoder);
        String targetPath = currentVersion.getHdfsPath() + "/store";
        if (readVersion != null) {
            log.info("updating {} incrementally with {}", (Object)targetPath, (Object)readVersion.getHdfsPath());
            Dataset currentMdStoreVersion = spark.read().load(readVersion.getHdfsPath() + "/store").as(encoder);
            TypedColumn aggregator = new MDStoreAggregator().toColumn();
            Dataset map = currentMdStoreVersion.union(mdstore).groupByKey(MetadataRecord::getId, Encoders.STRING()).agg(aggregator).map(Tuple2::_2, encoder);
            map.select("id", new String[0]).takeAsList(100).forEach(s -> log.info(s.toString()));
            DHPUtils.saveDataset((Dataset)map, (String)targetPath);
        } else {
            DHPUtils.saveDataset((Dataset)mdstore, (String)targetPath);
        }
        Long total = spark.read().load(targetPath).count();
        log.info("collected {} records for datasource '{}'", (Object)total, (Object)provenance.getDatasourceName());
        DHPUtils.writeHdfsFile((Configuration)spark.sparkContext().hadoopConfiguration(), (String)total.toString(), (String)(currentVersion.getHdfsPath() + "/size"));
    }

    public static MetadataRecord parseRecord(String input, String xpath, String encoding, Provenance provenance, Long dateOfCollection, LongAccumulator totalItems, LongAccumulator invalidRecords) {
        if (totalItems != null) {
            totalItems.add(1L);
        }
        try {
            SAXReader reader = new SAXReader();
            reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
            Document document = reader.read((InputStream)new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
            Node node = document.selectSingleNode(xpath);
            String originalIdentifier = node.getText();
            if (StringUtils.isBlank((CharSequence)originalIdentifier)) {
                if (invalidRecords != null) {
                    invalidRecords.add(1L);
                }
                return null;
            }
            return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
        }
        catch (Throwable e) {
            invalidRecords.add(1L);
            return null;
        }
    }

    public static class MDStoreAggregator
    extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
        public MetadataRecord zero() {
            return null;
        }

        public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
            return this.getLatestRecord(b, a);
        }

        public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
            return this.getLatestRecord(b, a);
        }

        private MetadataRecord getLatestRecord(MetadataRecord b, MetadataRecord a) {
            if (b == null) {
                return a;
            }
            if (a == null) {
                return b;
            }
            return a.getDateOfCollection() > b.getDateOfCollection() ? a : b;
        }

        public MetadataRecord finish(MetadataRecord r) {
            return r;
        }

        public Encoder<MetadataRecord> bufferEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }

        public Encoder<MetadataRecord> outputEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }
    }
}

