/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.model.mdstore.MetadataRecord;
import eu.dnetlib.dhp.model.mdstore.Provenance;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;

public class GenerateNativeStoreSparkJob {
    public static MetadataRecord parseRecord(String input, String xpath, String encoding, Provenance provenance, Long dateOfCollection, LongAccumulator totalItems, LongAccumulator invalidRecords) {
        if (totalItems != null) {
            totalItems.add(1L);
        }
        try {
            SAXReader reader = new SAXReader();
            Document document = reader.read((InputStream)new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
            Node node = document.selectSingleNode(xpath);
            String originalIdentifier = node.getText();
            if (StringUtils.isBlank((CharSequence)originalIdentifier)) {
                if (invalidRecords != null) {
                    invalidRecords.add(1L);
                }
                return null;
            }
            return new MetadataRecord(originalIdentifier, encoding, provenance, input, dateOfCollection.longValue());
        }
        catch (Throwable e) {
            if (invalidRecords != null) {
                invalidRecords.add(1L);
            }
            e.printStackTrace();
            return null;
        }
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)GenerateNativeStoreSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/collection/collection_input_parameters.json")));
        parser.parseArgument(args);
        ObjectMapper jsonMapper = new ObjectMapper();
        Provenance provenance = (Provenance)jsonMapper.readValue(parser.get("provenance"), Provenance.class);
        long dateOfCollection = new Long(parser.get("dateOfCollection"));
        SparkSession spark = SparkSession.builder().appName("GenerateNativeStoreSparkJob").master(parser.get("master")).getOrCreate();
        HashMap<String, String> ongoingMap = new HashMap<String, String>();
        HashMap<String, String> reportMap = new HashMap<String, String>();
        boolean test = parser.get("isTest") == null ? false : Boolean.valueOf(parser.get("isTest"));
        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
        JavaPairRDD inputRDD = sc.sequenceFile(parser.get("input"), IntWritable.class, Text.class);
        LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
        LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
        MessageManager manager = new MessageManager(parser.get("rabbitHost"), parser.get("rabbitUser"), parser.get("rabbitPassword"), false, false, null);
        JavaRDD mappeRDD = inputRDD.map((Function & Serializable)item -> GenerateNativeStoreSparkJob.parseRecord(((Text)item._2()).toString(), parser.get("xpath"), parser.get("encoding"), provenance, dateOfCollection, totalItems, invalidRecords)).filter(Objects::nonNull).distinct();
        ongoingMap.put("ongoing", "0");
        if (!test) {
            manager.sendMessage(new Message(parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), parser.get("rabbitOngoingQueue"), true, false);
        }
        Encoder encoder = Encoders.bean(MetadataRecord.class);
        Dataset mdstore = spark.createDataset(mappeRDD.rdd(), encoder);
        LongAccumulator mdStoreRecords = sc.sc().longAccumulator("MDStoreRecords");
        mdStoreRecords.add(mdstore.count());
        ongoingMap.put("ongoing", "" + totalItems.value());
        if (!test) {
            manager.sendMessage(new Message(parser.get("workflowId"), "DataFrameCreation", MessageType.ONGOING, ongoingMap), parser.get("rabbitOngoingQueue"), true, false);
        }
        mdstore.write().format("parquet").save(parser.get("output"));
        reportMap.put("inputItem", "" + totalItems.value());
        reportMap.put("invalidRecords", "" + invalidRecords.value());
        reportMap.put("mdStoreSize", "" + mdStoreRecords.value());
        if (!test) {
            manager.sendMessage(new Message(parser.get("workflowId"), "Collection", MessageType.REPORT, reportMap), parser.get("rabbitReportQueue"), true, false);
            manager.close();
        }
    }
}

