/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.doiboost.orcid.util.HDFSUtil;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.spark.SparkConf;
import org.mortbay.log.Log;

public class SparkGenLastModifiedSeq {
    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkGenLastModifiedSeq.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/download_orcid_data.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        String hdfsServerUri = parser.get("hdfsServerUri");
        String workingPath = parser.get("workingPath");
        String outputPath = parser.get("outputPath");
        String lambdaFileName = parser.get("lambdaFileName");
        String lambdaFileUri = hdfsServerUri.concat(workingPath).concat(lambdaFileName);
        String lastModifiedDateFromLambdaFileUri = "last_modified_date_from_lambda_file.txt";
        SparkConf sparkConf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)sparkConf, (Boolean)isSparkSessionManaged, spark -> {
            int rowsNum = 0;
            String lastModifiedAuthorDate = "";
            Path output = new Path(hdfsServerUri.concat(workingPath).concat(outputPath));
            Path hdfsreadpath = new Path(lambdaFileUri);
            Configuration conf = spark.sparkContext().hadoopConfiguration();
            conf.set("fs.defaultFS", hdfsServerUri.concat(workingPath));
            conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            conf.set("fs.file.impl", LocalFileSystem.class.getName());
            FileSystem fs = FileSystem.get((URI)URI.create(hdfsServerUri.concat(workingPath)), (Configuration)conf);
            FSDataInputStream lambdaFileStream = fs.open(hdfsreadpath);
            try (TarArchiveInputStream tais = new TarArchiveInputStream((InputStream)new GzipCompressorInputStream((InputStream)lambdaFileStream));){
                TarArchiveEntry entry = null;
                try (SequenceFile.Writer writer = SequenceFile.createWriter((Configuration)conf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)output), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class), SequenceFile.Writer.compression((SequenceFile.CompressionType)SequenceFile.CompressionType.BLOCK, (CompressionCodec)new GzipCodec())});){
                    while ((entry = tais.getNextTarEntry()) != null) {
                        String line;
                        BufferedReader br = new BufferedReader(new InputStreamReader((InputStream)tais));
                        while ((line = br.readLine()) != null) {
                            String[] values = line.split(",");
                            List<String> recordInfo = Arrays.asList(values);
                            String orcidId = recordInfo.get(0);
                            Text key = new Text(orcidId);
                            Text value = new Text(recordInfo.get(3));
                            writer.append((Writable)key, (Writable)value);
                            if (++rowsNum != 2) continue;
                            lastModifiedAuthorDate = value.toString();
                        }
                    }
                }
            }
            HDFSUtil.writeToTextFile(hdfsServerUri, workingPath, "last_modified_date_from_lambda_file.txt", lastModifiedAuthorDate);
            Log.info((String)("Saved rows from lamda csv tar file: " + rowsNum));
        });
    }
}

