/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.orcid;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.orcid.ORCIDWorker;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrcidGetUpdatesFile {
    private static Logger log = LoggerFactory.getLogger(OrcidGetUpdatesFile.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)Objects.requireNonNull(OrcidGetUpdatesFile.class.getResourceAsStream("/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json"))));
        parser.parseArgument(args);
        String namenode = parser.get("namenode");
        log.info("got variable namenode: {}", (Object)namenode);
        String targetPath = parser.get("targetPath");
        log.info("got variable targetPath: {}", (Object)targetPath);
        String apiURL = parser.get("apiURL");
        log.info("got variable apiURL: {}", (Object)apiURL);
        String accessToken = parser.get("accessToken");
        log.info("got variable accessToken: {}", (Object)accessToken);
        String graphPath = parser.get("graphPath");
        log.info("got variable graphPath: {}", (Object)graphPath);
        SparkSession spark = SparkSession.builder().appName(OrcidGetUpdatesFile.class.getName()).getOrCreate();
        String latestDate = ((Row)spark.read().load(graphPath + "/Authors").selectExpr(new String[]{"max(lastModifiedDate)"}).first()).getString(0);
        log.info("latest date is {}", (Object)latestDate);
        FileSystem fileSystem = FileSystem.get((Configuration)DHPUtils.getHadoopConfiguration((String)namenode));
        new OrcidGetUpdatesFile().readTar(fileSystem, accessToken, apiURL, targetPath, latestDate);
    }

    private SequenceFile.Writer createFile(Path aPath, FileSystem fileSystem) throws IOException {
        return SequenceFile.createWriter((Configuration)fileSystem.getConf(), (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)aPath), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)});
    }

    private ORCIDWorker createWorker(String id, String targetPath, BlockingQueue<String> queue, String accessToken, FileSystem fileSystem) throws Exception {
        return ORCIDWorker.builder().withId(id).withEmployments(this.createFile(new Path(String.format("%s/employments_%s", targetPath, id)), fileSystem)).withSummary(this.createFile(new Path(String.format("%s/summary_%s", targetPath, id)), fileSystem)).withWorks(this.createFile(new Path(String.format("%s/works_%s", targetPath, id)), fileSystem)).withAccessToken(accessToken).withBlockingQueue(queue).build();
    }

    public void readTar(FileSystem fileSystem, String accessToken, String apiURL, String targetPath, String startDate) throws Exception {
        HttpURLConnection urlConn = (HttpURLConnection)new URL(apiURL).openConnection();
        HttpClientParams clientParams = new HttpClientParams();
        urlConn.setInstanceFollowRedirects(false);
        urlConn.setReadTimeout(clientParams.getReadTimeOut() * 1000);
        urlConn.setConnectTimeout(clientParams.getConnectTimeOut() * 1000);
        if (urlConn.getResponseCode() > 199 && urlConn.getResponseCode() < 300) {
            TarArchiveEntry entry;
            int i;
            InputStream input = urlConn.getInputStream();
            Path hdfsWritePath = new Path("/tmp/orcid_updates.tar.gz");
            FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
            IOUtils.copy((InputStream)input, (OutputStream)fsDataOutputStream);
            fsDataOutputStream.flush();
            fsDataOutputStream.close();
            FSDataInputStream updateFile = fileSystem.open(hdfsWritePath);
            TarArchiveInputStream tais = new TarArchiveInputStream((InputStream)new GzipCompressorInputStream((InputStream)new BufferedInputStream(updateFile.getWrappedStream())));
            ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(3000);
            ArrayList<ORCIDWorker> workers = new ArrayList<ORCIDWorker>();
            for (i = 0; i < 22; ++i) {
                workers.add(this.createWorker("" + i, targetPath, queue, accessToken, fileSystem));
            }
            workers.forEach(Thread::start);
            while ((entry = tais.getNextTarEntry()) != null) {
                if (!entry.isFile()) continue;
                BufferedReader br = new BufferedReader(new InputStreamReader((InputStream)tais));
                System.out.println(br.readLine());
                br.lines().map(l -> l.split(",")).filter(s -> StringUtils.compare((String)s[3].substring(0, 10), (String)startDate) > 0).map(s -> s[0]).forEach(s -> {
                    try {
                        queue.put((String)s);
                    }
                    catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
            for (i = 0; i < 22; ++i) {
                queue.put(ORCIDWorker.JOB_COMPLETE);
            }
            for (ORCIDWorker worker : workers) {
                worker.join();
            }
        }
    }
}

