/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.usagestats;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.usagestats.UsageStatsModel;
import eu.dnetlib.dhp.actionmanager.usagestats.UsageStatsResultModel;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.action.AtomicAction;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Measure;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkAtomicActionUsageJob
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SparkAtomicActionUsageJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static <I extends Result> void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkAtomicActionUsageJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/usagestats/input_actionset_parameter.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String outputPath = parser.get("outputPath");
        log.info("outputPath {}: ", (Object)outputPath);
        SparkConf conf = new SparkConf();
        conf.set("hive.metastore.uris", parser.get("hive_metastore_uris"));
        String dbname = parser.get("usagestatsdb");
        String workingPath = parser.get("workingPath");
        String datasourcePath = parser.get("datasourcePath");
        SparkSessionSupport.runWithSparkHiveSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkAtomicActionUsageJob.removeOutputDir(spark, outputPath);
            SparkAtomicActionUsageJob.prepareResultData(dbname, spark, workingPath + "/usageDb", "usage_stats", "result_id", "repository_id", datasourcePath);
            SparkAtomicActionUsageJob.prepareData(dbname, spark, workingPath + "/projectDb", "project_stats", "id");
            SparkAtomicActionUsageJob.prepareData(dbname, spark, workingPath + "/datasourceDb", "datasource_stats", "repository_id");
            SparkAtomicActionUsageJob.writeActionSet(spark, workingPath, outputPath);
        });
    }

    private static void prepareResultData(String dbname, SparkSession spark, String workingPath, String tableName, String resultAttributeName, String datasourceAttributeName, String datasourcePath) {
        Dataset resultModel = spark.sql(String.format("select %s as id, %s as datasourceId, sum(downloads) as downloads, sum(views) as views from %s.%s group by %s, %s", resultAttributeName, datasourceAttributeName, dbname, tableName, resultAttributeName, datasourceAttributeName)).as(Encoders.bean(UsageStatsResultModel.class));
        Dataset datasource = SparkAtomicActionUsageJob.readPath(spark, datasourcePath, Datasource.class).filter((FilterFunction & Serializable)d -> d.getDataInfo().getDeletedbyinference() == false).map((MapFunction & Serializable)d -> {
            d.setId(d.getId().substring(3));
            return d;
        }, Encoders.bean(Datasource.class));
        resultModel.joinWith(datasource, resultModel.col("datasourceId").equalTo((Object)datasource.col("id")), "left").map((MapFunction & Serializable)t2 -> {
            UsageStatsResultModel usrm = (UsageStatsResultModel)t2._1();
            if (Optional.ofNullable((Datasource)t2._2()).isPresent()) {
                usrm.setDatasourceId(usrm.getDatasourceId() + "||" + (String)((Datasource)t2._2()).getOfficialname().getValue());
            } else {
                usrm.setDatasourceId(usrm.getDatasourceId() + "||NO_MATCH_FOUND");
            }
            return usrm;
        }, Encoders.bean(UsageStatsResultModel.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingPath);
    }

    private static void prepareData(String dbname, SparkSession spark, String workingPath, String tableName, String attribute_name) {
        spark.sql(String.format("select %s as id, sum(downloads) as downloads, sum(views) as views from %s.%s group by %s", attribute_name, dbname, tableName, attribute_name)).as(Encoders.bean(UsageStatsModel.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingPath);
    }

    public static void writeActionSet(SparkSession spark, String inputPath, String outputPath) {
        SparkAtomicActionUsageJob.getFinalIndicatorsResult(spark, inputPath + "/usageDb").toJavaRDD().map((Function & Serializable)p -> new AtomicAction(p.getClass(), (Oaf)p)).union(SparkAtomicActionUsageJob.getFinalIndicatorsProject(spark, inputPath + "/projectDb").toJavaRDD().map((Function & Serializable)p -> new AtomicAction(p.getClass(), (Oaf)p))).union(SparkAtomicActionUsageJob.getFinalIndicatorsDatasource(spark, inputPath + "/datasourceDb").toJavaRDD().map((Function & Serializable)p -> new AtomicAction(p.getClass(), (Oaf)p))).mapToPair((PairFunction & Serializable)aa -> new Tuple2((Object)new Text(aa.getClazz().getCanonicalName()), (Object)new Text(OBJECT_MAPPER.writeValueAsString(aa)))).saveAsHadoopFile(outputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
    }

    public static Measure newMeasureInstance(String id) {
        Measure m = new Measure();
        m.setId(id);
        m.setUnit(new ArrayList());
        return m;
    }

    private static Dataset<Result> getFinalIndicatorsResult(SparkSession spark, String inputPath) {
        return SparkAtomicActionUsageJob.readPath(spark, inputPath, UsageStatsResultModel.class).groupByKey((MapFunction & Serializable)usm -> usm.getId(), Encoders.STRING()).mapGroups((MapGroupsFunction & Serializable)(k, it) -> {
            Result r = new Result();
            r.setId("50|" + k);
            Measure download = SparkAtomicActionUsageJob.newMeasureInstance("downloads");
            Measure view = SparkAtomicActionUsageJob.newMeasureInstance("views");
            UsageStatsResultModel first = (UsageStatsResultModel)it.next();
            SparkAtomicActionUsageJob.addCountForDatasource(download, first, view);
            it.forEachRemaining(usm -> SparkAtomicActionUsageJob.addCountForDatasource(download, usm, view));
            r.setMeasures(Arrays.asList(download, view));
            return r;
        }, Encoders.bean(Result.class));
    }

    private static void addCountForDatasource(Measure download, UsageStatsResultModel usm, Measure view) {
        DataInfo dataInfo = OafMapperUtils.dataInfo((Boolean)false, (String)"update", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"measure:usage_counts", (String)"Inferred by OpenAIRE", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"");
        download.getUnit().add(OafMapperUtils.newKeyValueInstance((String)usm.getDatasourceId(), (String)String.valueOf(usm.getDownloads()), (DataInfo)dataInfo));
        view.getUnit().add(OafMapperUtils.newKeyValueInstance((String)usm.getDatasourceId(), (String)String.valueOf(usm.getViews()), (DataInfo)dataInfo));
    }

    private static Dataset<Project> getFinalIndicatorsProject(SparkSession spark, String inputPath) {
        return SparkAtomicActionUsageJob.readPath(spark, inputPath, UsageStatsModel.class).map((MapFunction & Serializable)usm -> {
            Project p = new Project();
            p.setId("40|" + usm.getId());
            p.setMeasures(SparkAtomicActionUsageJob.getMeasure(usm.getDownloads(), usm.getViews()));
            return p;
        }, Encoders.bean(Project.class));
    }

    private static Dataset<Datasource> getFinalIndicatorsDatasource(SparkSession spark, String inputPath) {
        return SparkAtomicActionUsageJob.readPath(spark, inputPath, UsageStatsModel.class).map((MapFunction & Serializable)usm -> {
            Datasource d = new Datasource();
            d.setId("10|" + usm.getId());
            d.setMeasures(SparkAtomicActionUsageJob.getMeasure(usm.getDownloads(), usm.getViews()));
            return d;
        }, Encoders.bean(Datasource.class));
    }

    private static List<Measure> getMeasure(Long downloads, Long views) {
        DataInfo dataInfo = OafMapperUtils.dataInfo((Boolean)false, (String)"update", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"measure:usage_counts", (String)"Inferred by OpenAIRE", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"");
        return Arrays.asList(OafMapperUtils.newMeasureInstance((String)"downloads", (String)String.valueOf(downloads), (String)"count", (DataInfo)dataInfo), OafMapperUtils.newMeasureInstance((String)"views", (String)String.valueOf(views), (String)"count", (DataInfo)dataInfo));
    }

    private static void removeOutputDir(SparkSession spark, String path) {
        HdfsSupport.remove((String)path, (Configuration)spark.sparkContext().hadoopConfiguration());
    }

    public static <R> Dataset<R> readPath(SparkSession spark, String inputPath, Class<R> clazz) {
        return spark.read().textFile(inputPath).map((MapFunction & Serializable)value -> OBJECT_MAPPER.readValue(value, clazz), Encoders.bean(clazz));
    }
}

