/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.hadoop.action;

import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.action.AbstractSubmitAction;
import eu.dnetlib.data.hadoop.action.JobCompletion;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanFactory;
import eu.dnetlib.data.hadoop.utils.ScanProperties;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;

public class SubmitMapreduceJobAction
extends AbstractSubmitAction {
    private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class);

    @Override
    public void submit(JobCompletion callback, BlackboardJob bbJob, String jobName, JobProfile jobProfile) throws HadoopServiceException {
        ClusterName clusterName = ClusterName.valueOf((String)((String)bbJob.getParameters().get("cluster")));
        try {
            JobConf jobConf = this.prepareJob(this.getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
            this.logJobDetails(jobConf);
            JobClient jtClient = this.hadoopClientMap.getJtClient(clusterName);
            RunningJob runningJob = jtClient.submitJob(jobConf);
            String jobId = this.newJobId(clusterName, runningJob.getID().getId());
            this.jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(jtClient, runningJob, callback)));
        }
        catch (IOException e) {
            throw new HadoopServiceException("error executing hadoop job: " + jobName, (Throwable)e);
        }
    }

    protected JobConf prepareJob(Configuration configuration, String jobName, JobProfile jobProfile, Map<String, String> parameters) throws IOException, HadoopServiceException {
        log.info((Object)("creating job: " + jobName));
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName(jobName);
        jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
        String jobLib = this.getJobLib(configuration, jobProfile);
        jobConf.setJar(new Path(jobLib).toString());
        this.set(jobConf, jobProfile.getJobDefinition());
        this.set(jobConf, parameters);
        ScanProperties scanProperties = jobProfile.getScanProperties();
        if (jobProfile.getRequiredParams().contains("hbase.mapreduce.inputtable") && scanProperties != null) {
            jobConf.set("hbase.mapreduce.scan", ScanFactory.getScan(scanProperties));
        }
        return jobConf;
    }

    protected String getJobLib(Configuration configuration, JobProfile jobProfile) throws HadoopServiceException {
        String jobLib = this.getDefaultLibPath(configuration.get("fs.defaultFS"));
        if (jobProfile.getJobDefinition().containsKey("job.lib")) {
            jobLib = jobProfile.getJobDefinition().get("job.lib");
        }
        if (jobLib == null || jobLib.isEmpty()) {
            throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
        }
        if (!jobLib.startsWith("hdfs://")) {
            jobLib = configuration.get("fs.defaultFS") + jobLib;
        }
        log.info((Object)("using job.lib: " + jobLib));
        return jobLib;
    }

    protected void set(JobConf jobConf, Map<String, String> properties) {
        for (Map.Entry<String, String> e : properties.entrySet()) {
            if (this.checkHdfsProperty(e)) {
                String v = jobConf.get("fs.defaultFS") + e.getValue();
                e.setValue(v);
            }
            jobConf.set(e.getKey(), e.getValue());
        }
    }

    protected void logJobDetails(JobConf jobConf) {
        StringWriter sw = new StringWriter();
        try {
            jobConf.writeXml((Writer)sw);
            if (log.isDebugEnabled()) {
                log.debug((Object)("\n" + IndentXmlString.apply((String)sw.toString())));
            }
        }
        catch (IOException e) {
            log.warn((Object)("unable to log job details: " + jobConf.getJobName()));
        }
    }
}

