/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.datacite;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.datacite.DataciteAPIImporter;
import eu.dnetlib.dhp.datacite.DataciteType;
import eu.dnetlib.dhp.datacite.ImportDatacite$;
import eu.dnetlib.dhp.datacite.ImportDatacite$$anon$1$;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions$;
import org.json4s.DefaultFormats$;
import org.json4s.Formats;
import org.json4s.JsonAST;
import org.json4s.jackson.JsonMethods$;
import org.json4s.package$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.reflect.Manifest;
import scala.reflect.ManifestFactory$;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.VolatileByteRef;

public final class ImportDatacite$ {
    public static final ImportDatacite$ MODULE$;
    private final Logger log;

    static {
        new ImportDatacite$();
    }

    public Logger log() {
        return this.log;
    }

    public DataciteType convertAPIStringToDataciteItem(String input) {
        ObjectRef formats$lzy = ObjectRef.zero();
        ObjectRef json$lzy = ObjectRef.zero();
        VolatileByteRef bitmap$0 = VolatileByteRef.create((byte)0);
        String doi = ((String)package$.MODULE$.jvalue2extractable(package$.MODULE$.jvalue2monadic(package$.MODULE$.jvalue2monadic(this.json$1(input, json$lzy, bitmap$0)).$bslash("attributes")).$bslash("doi")).extract((Formats)this.formats$1(formats$lzy, bitmap$0), ManifestFactory$.MODULE$.classType(String.class))).toLowerCase();
        boolean isActive = BoxesRunTime.unboxToBoolean((Object)package$.MODULE$.jvalue2extractable(package$.MODULE$.jvalue2monadic(package$.MODULE$.jvalue2monadic(this.json$1(input, json$lzy, bitmap$0)).$bslash("attributes")).$bslash("isActive")).extract((Formats)this.formats$1(formats$lzy, bitmap$0), (Manifest)ManifestFactory$.MODULE$.Boolean()));
        String timestamp_string = (String)package$.MODULE$.jvalue2extractable(package$.MODULE$.jvalue2monadic(package$.MODULE$.jvalue2monadic(this.json$1(input, json$lzy, bitmap$0)).$bslash("attributes")).$bslash("updated")).extract((Formats)this.formats$1(formats$lzy, bitmap$0), ManifestFactory$.MODULE$.classType(String.class));
        LocalDateTime dt = LocalDateTime.parse(timestamp_string, DateTimeFormatter.ISO_DATE_TIME);
        return new DataciteType(doi, dt.toInstant(ZoneOffset.UTC).toEpochMilli() / 1000L, isActive, input);
    }

    public void main(String[] args) {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(Source$.MODULE$.fromInputStream(this.getClass().getResourceAsStream("/eu/dnetlib/dhp/datacite/import_from_api.json"), Codec$.MODULE$.fallbackSystemCodec()).mkString());
        parser.parseArgument(args);
        String hdfsuri = parser.get("namenode");
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"namenode is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{hdfsuri})));
        String targetPath = parser.get("targetPath");
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"targetPath is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
        String dataciteDump = parser.get("dataciteDumpPath");
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"dataciteDump is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataciteDump})));
        Path hdfsTargetPath = new Path(targetPath);
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"hdfsTargetPath is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{hdfsTargetPath})));
        int bs = parser.get("blocksize") == null ? 100 : new StringOps(Predef$.MODULE$.augmentString(parser.get("blocksize"))).toInt();
        String spkipImport = parser.get("skipImport");
        this.log().info(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"skipImport is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{spkipImport})));
        SparkSession spark = SparkSession$.MODULE$.builder().appName(this.getClass().getSimpleName()).getOrCreate();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfsuri);
        conf.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        conf.set("fs.file.impl", LocalFileSystem.class.getName());
        SparkContext sc = spark.sparkContext();
        sc.setLogLevel("ERROR");
        Aggregator<DataciteType, DataciteType, DataciteType> dataciteAggregator = new Aggregator<DataciteType, DataciteType, DataciteType>(spark){
            private final SparkSession spark$1;

            public DataciteType zero() {
                return null;
            }

            public DataciteType reduce(DataciteType a, DataciteType b) {
                if (b == null) {
                    return a;
                }
                if (a == null) {
                    return b;
                }
                if (a.timestamp() > b.timestamp()) {
                    return a;
                }
                return b;
            }

            public DataciteType merge(DataciteType a, DataciteType b) {
                return this.reduce(a, b);
            }

            public Encoder<DataciteType> bufferEncoder() {
                JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(anon.1.class.getClassLoader());
                public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator4$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                    }

                    public Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator4$1(anon.1 $outer) {
                    }
                }
                return (Encoder)Predef$.MODULE$.implicitly((Object)this.spark$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator4$1(this))));
            }

            public Encoder<DataciteType> outputEncoder() {
                JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
                JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(anon.1.class.getClassLoader());
                public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1
                extends TypeCreator {
                    public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                        Universe $u = $m$untyped.universe();
                        Mirror<U> $m = $m$untyped;
                        return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                    }

                    public Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1(anon.1 $outer) {
                    }
                }
                return (Encoder)Predef$.MODULE$.implicitly((Object)this.spark$1.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$anon$1$$typecreator5$1(this))));
            }

            public DataciteType finish(DataciteType reduction) {
                return reduction;
            }
            {
                this.spark$1 = spark$1;
            }
        };
        JavaUniverse $u = scala.reflect.runtime.package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
        public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator9$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
            }

            public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator9$1() {
            }
        }
        Dataset dump = spark.read().load(dataciteDump).as(spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator9$1())));
        long ts = ((Row)dump.select((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.max("timestamp")})).first()).getLong(0);
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"last Timestamp is ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)ts)})));
        long cnt = "true".equalsIgnoreCase(spkipImport) ? 1L : this.writeSequenceFile(hdfsTargetPath, ts, conf, bs);
        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Imported from Datacite API ", " documents"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)cnt)})));
        if (cnt > 0L) {
            RDD inputRdd = sc.sequenceFile(targetPath, Integer.TYPE, Text.class).map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(Tuple2<Object, Text> s) {
                    return ((Text)s._2()).toString();
                }
            }, ClassTag$.MODULE$.apply(String.class)).map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final DataciteType apply(String s) {
                    return ImportDatacite$.MODULE$.convertAPIStringToDataciteItem(s);
                }
            }, ClassTag$.MODULE$.apply(DataciteType.class));
            JavaUniverse $u2 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m2 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator13$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator13$1() {
                }
            }
            spark.createDataset(inputRdd, spark.implicits().newProductEncoder(((TypeTags)$u2).TypeTag().apply((Mirror)$m2, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator13$1()))).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "_dataset"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath})));
            JavaUniverse $u3 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m3 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator17$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator17$1() {
                }
            }
            Dataset ds = spark.read().load(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "_dataset"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{targetPath}))).as(spark.implicits().newProductEncoder(((TypeTags)$u3).TypeTag().apply((Mirror)$m3, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator17$1())));
            JavaUniverse $u4 = scala.reflect.runtime.package$.MODULE$.universe();
            JavaUniverse.JavaMirror $m4 = scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(this.getClass().getClassLoader());
            public final class Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator25$1
            extends TypeCreator {
                public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                    Universe $u = $m$untyped.universe();
                    Mirror<U> $m = $m$untyped;
                    return $m.staticClass("eu.dnetlib.dhp.datacite.DataciteType").asType().toTypeConstructor();
                }

                public Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator25$1() {
                }
            }
            dump.union(ds).groupByKey((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final String apply(DataciteType x$1) {
                    return x$1.doi();
                }
            }, spark.implicits().newStringEncoder()).agg(dataciteAggregator.toColumn()).map((Function1)new Serializable(){
                public static final long serialVersionUID = 0L;

                public final DataciteType apply(Tuple2<String, DataciteType> s) {
                    return (DataciteType)s._2();
                }
            }, spark.implicits().newProductEncoder(((TypeTags)$u4).TypeTag().apply((Mirror)$m4, (TypeCreator)new Eu_dnetlib_dhp_datacite_ImportDatacite$$typecreator25$1()))).repartition(4000).write().mode(SaveMode.Overwrite).save(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "_updated"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataciteDump})));
            FileSystem fs = FileSystem.get((Configuration)sc.hadoopConfiguration());
            fs.delete(new Path(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataciteDump}))), true);
            fs.rename(new Path(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", "_updated"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataciteDump}))), new Path(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataciteDump}))));
        }
    }

    private long writeSequenceFile(Path hdfsTargetPath, long timestamp, Configuration conf, int bs) {
        long delta = 100000000L;
        DataciteAPIImporter client = null;
        long now = System.currentTimeMillis();
        int i = 0;
        try {
            SequenceFile.Writer writer = SequenceFile.createWriter((Configuration)conf, (SequenceFile.Writer.Option[])new SequenceFile.Writer.Option[]{SequenceFile.Writer.file((Path)hdfsTargetPath), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class)});
            try {
                try {
                    long start = System.currentTimeMillis();
                    for (long from = timestamp * 1000L; from < now; from += delta) {
                        client = new DataciteAPIImporter(from, bs, from + delta);
                        long end = 0L;
                        IntWritable key = new IntWritable(i);
                        Text value = new Text();
                        while (client.hasNext()) {
                            key.set(++i - 1);
                            value.set(client.next());
                            writer.append((Writable)key, (Writable)value);
                            writer.hflush();
                            if (i % 1000 != 0) continue;
                            end = System.currentTimeMillis();
                            float time = (float)((double)(end - start) / 1000.0);
                            Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Imported ", " in ", " seconds"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)i), BoxesRunTime.boxToFloat((float)time)})));
                            start = System.currentTimeMillis();
                        }
                        Predef$.MODULE$.println((Object)new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"updating from value: ", "  -> ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToLong((long)from), BoxesRunTime.boxToLong((long)(from + delta))})));
                    }
                }
                catch (Throwable throwable) {
                    Predef$.MODULE$.println((Object)new Tuple2((Object)"Error", (Object)throwable));
                }
            }
            finally {
                if (writer != null) {
                    writer.close();
                }
            }
        }
        catch (Throwable throwable) {
            this.log().error("Error", throwable);
        }
        return i;
    }

    private final DefaultFormats$ formats$lzycompute$1(ObjectRef formats$lzy$1, VolatileByteRef bitmap$0$1) {
        ImportDatacite$ importDatacite$ = this;
        synchronized (importDatacite$) {
            if ((byte)(bitmap$0$1.elem & 1) == 0) {
                formats$lzy$1.elem = DefaultFormats$.MODULE$;
                bitmap$0$1.elem = (byte)(bitmap$0$1.elem | 1);
            }
            return (DefaultFormats$)formats$lzy$1.elem;
        }
    }

    private final DefaultFormats$ formats$1(ObjectRef formats$lzy$1, VolatileByteRef bitmap$0$1) {
        return (byte)(bitmap$0$1.elem & 1) == 0 ? this.formats$lzycompute$1(formats$lzy$1, bitmap$0$1) : (DefaultFormats$)formats$lzy$1.elem;
    }

    private final JsonAST.JValue json$lzycompute$1(String input$1, ObjectRef json$lzy$1, VolatileByteRef bitmap$0$1) {
        ImportDatacite$ importDatacite$ = this;
        synchronized (importDatacite$) {
            if ((byte)(bitmap$0$1.elem & 2) == 0) {
                json$lzy$1.elem = JsonMethods$.MODULE$.parse(package$.MODULE$.string2JsonInput(input$1), JsonMethods$.MODULE$.parse$default$2(), JsonMethods$.MODULE$.parse$default$3());
                bitmap$0$1.elem = (byte)(bitmap$0$1.elem | 2);
            }
            return (JsonAST.JValue)json$lzy$1.elem;
        }
    }

    private final JsonAST.JValue json$1(String input$1, ObjectRef json$lzy$1, VolatileByteRef bitmap$0$1) {
        return (byte)(bitmap$0$1.elem & 2) == 0 ? this.json$lzycompute$1(input$1, json$lzy$1, bitmap$0$1) : (JsonAST.JValue)json$lzy$1.elem;
    }

    private ImportDatacite$() {
        MODULE$ = this;
        this.log = LoggerFactory.getLogger(this.getClass());
    }
}

