/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.sx.bio.ebi;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.CollectionUtils$;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.bio.ebi.SparkEBILinksToOaf$;
import eu.dnetlib.dhp.sx.bio.pubmed.PMArticle;
import eu.dnetlib.dhp.sx.bio.pubmed.PMAuthor;
import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal;
import eu.dnetlib.dhp.sx.bio.pubmed.PMParser;
import eu.dnetlib.dhp.sx.bio.pubmed.PubMedToOaf$;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

public final class SparkCreateBaselineDataFrame$ {
    public static final SparkCreateBaselineDataFrame$ MODULE$;
    private final Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator;

    static {
        new SparkCreateBaselineDataFrame$();
    }

    /*
     * WARNING - void declaration
     */
    public List<Tuple2<String, String>> requestBaseLineUpdatePage(String maxFile) {
        void var3_3;
        String data = this.requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/");
        List result = new StringOps(Predef$.MODULE$.augmentString(data)).linesWithSeparators().map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(String l) {
                return new StringOps(Predef$.MODULE$.augmentString(l)).stripLineEnd();
            }
        }).filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String l) {
                return l.startsWith("<a href=");
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final String apply(String l) {
                int end = l.lastIndexOf("\">");
                int start = l.indexOf("<a href=\"");
                return start >= 0 && end > start ? l.substring(start + 9, end - start) : "";
            }
        }).filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(String s) {
                return s.endsWith(".gz");
            }
        }).filter((Function1)new Serializable(maxFile){
            public static final long serialVersionUID = 0L;
            private final String maxFile$1;

            public final boolean apply(String s) {
                return new StringOps(Predef$.MODULE$.augmentString(s)).$greater((Object)this.maxFile$1);
            }
            {
                this.maxFile$1 = maxFile$1;
            }
        }).map((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final Tuple2<String, String> apply(String s) {
                return new Tuple2((Object)s, (Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{s})));
            }
        }).toList();
        return var3_3;
    }

    public InputStream downloadBaselinePart(String url) {
        HttpGet r = new HttpGet(url);
        int timeout = 60;
        RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
        CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
        CloseableHttpResponse response = client.execute((HttpUriRequest)r);
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"get response with status", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)response.getStatusLine().getStatusCode())})));
        return response.getEntity().getContent();
    }

    /*
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public String requestPage(String url) {
        HttpGet r = new HttpGet(url);
        int timeout = 60;
        RequestConfig config = RequestConfig.custom().setConnectTimeout(timeout * 1000).setConnectionRequestTimeout(timeout * 1000).setSocketTimeout(timeout * 1000).build();
        CloseableHttpClient client = HttpClientBuilder.create().setDefaultRequestConfig(config).build();
        try {
            for (int tries = 4; tries > 0; --tries) {
                Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"requesting ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{r.getURI()})));
                {
                    CloseableHttpResponse response = client.execute((HttpUriRequest)r);
                    Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"get response with status", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)response.getStatusLine().getStatusCode())})));
                    if (response.getStatusLine().getStatusCode() <= 400) return IOUtils.toString((InputStream)response.getEntity().getContent(), (Charset)Charset.defaultCharset());
                    continue;
                }
            }
            return "";
        }
        finally {
            if (client != null) {
                client.close();
            }
        }
    }

    public void downloadBaseLineUpdate(String baselinePath, String hdfsServerUri) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsServerUri);
        FileSystem fs = FileSystem.get((Configuration)conf);
        Path p = new Path(baselinePath);
        RemoteIterator files = fs.listFiles(p, false);
        String max_file = "";
        while (files.hasNext()) {
            LocatedFileStatus c = (LocatedFileStatus)files.next();
            String data = c.getPath().toString();
            String fileName = data.substring(data.lastIndexOf("/") + 1);
            if (!new StringOps(Predef$.MODULE$.augmentString(fileName)).$greater((Object)max_file)) continue;
            max_file = fileName;
        }
        List<Tuple2<String, String>> files_to_download = this.requestBaseLineUpdatePage(max_file);
        files_to_download.foreach((Function1)new Serializable(baselinePath, fs){
            public static final long serialVersionUID = 0L;
            private final String baselinePath$1;
            private final FileSystem fs$1;

            public final void apply(Tuple2<String, String> u) {
                Path hdfsWritePath = new Path(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.baselinePath$1, u._1()})));
                FSDataOutputStream fsDataOutputStream = this.fs$1.create(hdfsWritePath, true);
                InputStream i = SparkCreateBaselineDataFrame$.MODULE$.downloadBaselinePart((String)u._2());
                IOUtils.copy((InputStream)i, (OutputStream)fsDataOutputStream);
                Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Downloaded ", " into ", "/", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{u._2(), this.baselinePath$1, u._1()})));
                fsDataOutputStream.close();
            }
            {
                this.baselinePath$1 = baselinePath$1;
                this.fs$1 = fs$1;
            }
        });
    }

    public Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator() {
        return this.pmArticleAggregator;
    }

    public void main(String[] args) {
        SparkConf conf = new SparkConf();
        Logger log = LoggerFactory.getLogger(this.getClass());
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkEBILinksToOaf$.MODULE$.getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json"), (Charset)Charset.defaultCharset()));
        parser.parseArgument(args);
        String isLookupUrl = parser.get("isLookupUrl");
        log.info("isLookupUrl: {}", new Object[]{isLookupUrl});
        String workingPath = parser.get("workingPath");
        log.info("workingPath: {}", new Object[]{workingPath});
        String targetPath = parser.get("targetPath");
        log.info("targetPath: {}", new Object[]{targetPath});
        String hdfsServerUri = parser.get("hdfsServerUri");
        log.info("hdfsServerUri: {}", new Object[]{targetPath});
        String skipUpdate = parser.get("skipUpdate");
        log.info("skipUpdate: {}", new Object[]{skipUpdate});
        ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookupService);
        SparkSession spark = SparkSession$.MODULE$.builder().config(conf).appName(SparkEBILinksToOaf$.MODULE$.getClass().getSimpleName()).getOrCreate();
        SparkContext sc = spark.sparkContext();
        Encoder PMEncoder = Encoders$.MODULE$.kryo(PMArticle.class);
        Encoder PMJEncoder = Encoders$.MODULE$.kryo(PMJournal.class);
        Encoder PMAEncoder = Encoders$.MODULE$.kryo(PMAuthor.class);
        Encoder resultEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        if (!"true".equalsIgnoreCase(skipUpdate)) {
            this.downloadBaseLineUpdate(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/baseline"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{workingPath})), hdfsServerUri);
            RDD k = sc.wholeTextFiles(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/baseline"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{workingPath})), 2000);
            XMLInputFactory inputFactory = XMLInputFactory.newInstance();
            Dataset ds = spark.createDataset(k.filter((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final boolean apply(Tuple2<String, String> i) {
                    return ((String)i._1()).endsWith(".gz");
                }
            }).flatMap((Function1)new Serializable(inputFactory){
                public static final long serialVersionUID = 0L;
                private final XMLInputFactory inputFactory$1;

                public final PMParser apply(Tuple2<String, String> i) {
                    XMLEventReader xml = this.inputFactory$1.createXMLEventReader(new ByteArrayInputStream(((String)i._2()).getBytes()));
                    return new PMParser(xml);
                }
                {
                    this.inputFactory$1 = inputFactory$1;
                }
            }, ClassTag$.MODULE$.apply(PMArticle.class)), PMEncoder);
            ds.map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final Tuple2<String, PMArticle> apply(PMArticle p) {
                    return new Tuple2((Object)p.getPmid(), (Object)p);
                }
            }, Encoders$.MODULE$.tuple(Encoders$.MODULE$.STRING(), PMEncoder)).groupByKey((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(Tuple2<String, PMArticle> x$1) {
                    return (String)x$1._1();
                }
            }, spark.implicits().newStringEncoder()).agg(this.pmArticleAggregator().toColumn()).map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final PMArticle apply(Tuple2<String, PMArticle> p) {
                    return (PMArticle)p._2();
                }
            }, PMEncoder).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/baseline_dataset"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{workingPath})));
        }
        Dataset exported_dataset = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "/baseline_dataset"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{workingPath}))).as(PMEncoder);
        CollectionUtils$.MODULE$.saveDataset((Dataset<Oaf>)exported_dataset.map((Function1)new Serializable(vocabularies){
            public static final long serialVersionUID = 0L;
            private final VocabularyGroup vocabularies$1;

            public final Oaf apply(PMArticle a) {
                return PubMedToOaf$.MODULE$.convert(a, this.vocabularies$1);
            }
            {
                this.vocabularies$1 = vocabularies$1;
            }
        }, resultEncoder).as(resultEncoder).filter((Function1)new Serializable(){
            public static final long serialVersionUID = 0L;

            public final boolean apply(Oaf p) {
                return p != null;
            }
        }), targetPath);
    }

    private SparkCreateBaselineDataFrame$() {
        MODULE$ = this;
        this.pmArticleAggregator = new Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle>(){

            public PMArticle zero() {
                return new PMArticle();
            }

            public PMArticle reduce(PMArticle b, Tuple2<String, PMArticle> a) {
                return b == null || b.getPmid() == null ? (PMArticle)a._2() : b;
            }

            public PMArticle merge(PMArticle b1, PMArticle b2) {
                return b1 == null || b1.getPmid() == null ? b2 : b1;
            }

            public PMArticle finish(PMArticle reduction) {
                return reduction;
            }

            public Encoder<PMArticle> bufferEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }

            public Encoder<PMArticle> outputEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }
        };
    }
}

