/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.broker.oa;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GenerateStatsJob {
    private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)GenerateStatsJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        SparkConf conf = new SparkConf();
        String eventsPath = parser.get("outputDir") + "/events";
        log.info("eventsPath: {}", (Object)eventsPath);
        String dbUrl = parser.get("dbUrl");
        log.info("dbUrl: {}", (Object)dbUrl);
        String dbUser = parser.get("dbUser");
        log.info("dbUser: {}", (Object)dbUser);
        String dbPassword = parser.get("dbPassword");
        log.info("dbPassword: {}", (Object)"***");
        String brokerApiBaseUrl = parser.get("brokerApiBaseUrl");
        log.info("brokerApiBaseUrl: {}", (Object)brokerApiBaseUrl);
        TypedColumn aggr = new StatsAggregator().toColumn();
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPassword);
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            ClusterUtils.readPath(spark, eventsPath, Event.class).groupByKey((MapFunction & Serializable)e -> e.getTopic() + "@@@" + e.getMap().getTargetDatasourceId(), Encoders.STRING()).agg(aggr).map((MapFunction & Serializable)t -> (DatasourceStats)t._2, Encoders.bean(DatasourceStats.class)).coalesce(1).write().mode(SaveMode.Overwrite).jdbc(dbUrl, "oa_datasource_stats_temp", connectionProperties);
            log.info("*** updateStats");
            GenerateStatsJob.updateStats(brokerApiBaseUrl);
            log.info("*** ALL done.");
        });
    }

    private static String updateStats(String brokerApiBaseUrl) throws IOException {
        String url = brokerApiBaseUrl + "/api/openaireBroker/stats/update";
        HttpGet req = new HttpGet(url);
        try (CloseableHttpClient client = HttpClients.createDefault();){
            String string;
            block12: {
                CloseableHttpResponse response = client.execute((HttpUriRequest)req);
                try {
                    string = IOUtils.toString((InputStream)response.getEntity().getContent());
                    if (response == null) break block12;
                }
                catch (Throwable throwable) {
                    if (response != null) {
                        try {
                            response.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                response.close();
            }
            return string;
        }
    }
}

