/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.doiboost.orcid;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.doiboost.orcid.model.DownloadedRecordData;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.util.LongAccumulator;
import org.mortbay.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkOrcidGenerateAuthors {
    static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    static final String lastUpdate = "2019-09-30 00:00:00";

    public static void main(String[] args) throws IOException, Exception {
        Logger logger = LoggerFactory.getLogger(SparkOrcidGenerateAuthors.class);
        logger.info("[ SparkOrcidGenerateAuthors STARTED]");
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkOrcidGenerateAuthors.class.getResourceAsStream("/eu/dnetlib/dhp/doiboost/gen_orcid_authors_parameters.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String workingPath = parser.get("workingPath");
        logger.info("workingPath: ", (Object)workingPath);
        String outputAuthorsPath = parser.get("outputAuthorsPath");
        logger.info("outputAuthorsPath: ", (Object)outputAuthorsPath);
        String token = parser.get("token");
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
            LongAccumulator parsedRecordsAcc = sc.sc().longAccumulator("parsedRecords");
            LongAccumulator modifiedRecordsAcc = sc.sc().longAccumulator("modifiedRecords");
            LongAccumulator downloadedRecordsAcc = sc.sc().longAccumulator("downloadedRecords");
            LongAccumulator alreadyDownloadedRecords = sc.sc().longAccumulator("alreadyDownloadedRecords");
            JavaRDD lamdaFileRDD = sc.textFile(workingPath + "lamdafiles");
            JavaRDD downloadedRDD = sc.textFile(workingPath + "downloaded");
            Function & Serializable getOrcidIdFunction = (Function & Serializable)line -> {
                try {
                    String[] values = line.split(",");
                    return values[0].substring(1);
                }
                catch (Exception e) {
                    return new String("");
                }
            };
            List downloadedRecords = downloadedRDD.map((Function)getOrcidIdFunction).collect();
            Function & Serializable isModifiedAfterFilter = (Function & Serializable)line -> {
                String[] values = line.split(",");
                String orcidId = values[0];
                parsedRecordsAcc.add(1L);
                if (SparkOrcidGenerateAuthors.isModified(orcidId, values[3])) {
                    modifiedRecordsAcc.add(1L);
                    return true;
                }
                return false;
            };
            Function & Serializable isNotDownloadedFilter = (Function & Serializable)line -> {
                String[] values = line.split(",");
                String orcidId = values[0];
                if (downloadedRecords.contains(orcidId)) {
                    alreadyDownloadedRecords.add(1L);
                    return false;
                }
                return true;
            };
            Function & Serializable downloadRecordFunction = (Function & Serializable)line -> {
                String[] values = line.split(",");
                String orcidId = values[0];
                String modifiedDate = values[3];
                return SparkOrcidGenerateAuthors.downloadRecord(orcidId, modifiedDate, token, downloadedRecordsAcc);
            };
            lamdaFileRDD.filter((Function)isModifiedAfterFilter).filter((Function)isNotDownloadedFilter).map((Function)downloadRecordFunction).rdd().saveAsTextFile(workingPath.concat(outputAuthorsPath));
        });
    }

    private static boolean isModified(String orcidId, String modifiedDate) {
        Date modifiedDateDt = null;
        Date lastUpdateDt = null;
        try {
            if (modifiedDate.length() != 19) {
                modifiedDate = modifiedDate.substring(0, 19);
            }
            modifiedDateDt = new SimpleDateFormat(DATE_FORMAT).parse(modifiedDate);
            lastUpdateDt = new SimpleDateFormat(DATE_FORMAT).parse(lastUpdate);
        }
        catch (Exception e) {
            Log.warn((String)("[" + orcidId + "] Parsing date: "), (Object)e.getMessage());
            return true;
        }
        return modifiedDateDt.after(lastUpdateDt);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static Tuple2<String, String> downloadRecord(String orcidId, String modifiedDate, String token, LongAccumulator downloadedRecordsAcc) {
        DownloadedRecordData data = new DownloadedRecordData();
        data.setOrcidId(orcidId);
        data.setModifiedDate(modifiedDate);
        try (CloseableHttpClient client = HttpClients.createDefault();){
            HttpGet httpGet = new HttpGet("https://api.orcid.org/v3.0/" + orcidId + "/record");
            httpGet.addHeader("Accept", "application/vnd.orcid+xml");
            httpGet.addHeader("Authorization", String.format("Bearer %s", token));
            CloseableHttpResponse response = client.execute((HttpUriRequest)httpGet);
            int statusCode = response.getStatusLine().getStatusCode();
            data.setStatusCode(statusCode);
            if (statusCode != 200) {
                Log.warn((String)("Downloading " + orcidId + " status code: " + response.getStatusLine().getStatusCode()));
                Tuple2<String, String> tuple2 = data.toTuple2();
                return tuple2;
            }
            downloadedRecordsAcc.add(1L);
            data.setCompressedData(ArgumentApplicationParser.compressArgument((String)IOUtils.toString((InputStream)response.getEntity().getContent())));
            return data.toTuple2();
        }
        catch (Throwable e) {
            Log.warn((String)("Downloading " + orcidId), (Object)e.getMessage());
            data.setErrorMessage(e.getMessage());
            return data.toTuple2();
        }
    }
}

