package eu.dnetlib.data.mapreduce.wf.oozie;

import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.NodeToken;
import com.googlecode.sarasvati.env.Env;
import eu.dnetlib.workflow.AbstractJobNode;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowJob;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/wf/oozie/OozieWorkflowNode.class */
public class OozieWorkflowNode extends AbstractJobNode {
    private static final Log log = LogFactory.getLog(OozieWorkflowNode.class);
    public static final String ENV_ATTRIBUTE_OOZIE_SERVICE_LOC = "oozie.service.loc";
    public static final String ENV_ATTRIBUTE_OOZIE_JOB_STATUS = "oozie.job.status";
    public static final String ENV_ATTRIBUTE_OOZIE_TOTAL_ACTIONS_COUNT = "oozie.total.actions.count";
    private Map<String, String> predefinedProperties;
    private String oozieServiceLocation;
    private boolean waitForResults = false;
    private int monitorSleepTimeSecs = 60;
    private int defaultTotalActionsCount = -1;

    /* loaded from: input_file:eu/dnetlib/data/mapreduce/wf/oozie/OozieWorkflowNode$OozieJobMonitor.class */
    class OozieJobMonitor implements Runnable {
        private final OozieClient oozieClient;
        private final String jobId;
        private final Engine engine;
        private final NodeToken token;
        private final int monitorSleepTimeSecs;

        public OozieJobMonitor(OozieClient oozieClient, String str, Engine engine, NodeToken nodeToken, int i) {
            this.oozieClient = oozieClient;
            this.jobId = str;
            this.engine = engine;
            this.token = nodeToken;
            this.monitorSleepTimeSecs = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.oozieClient.getJobInfo(this.jobId).getStatus() == WorkflowJob.Status.RUNNING) {
                try {
                    Thread.sleep(this.monitorSleepTimeSecs * 1000);
                } catch (InterruptedException e) {
                    OozieWorkflowNode.this.failed(this.engine, this.token, e);
                    return;
                } catch (OozieClientException e2) {
                    OozieWorkflowNode.this.failed(this.engine, this.token, e2);
                    return;
                }
            }
            WorkflowJob.Status status = this.oozieClient.getJobInfo(this.jobId).getStatus();
            OozieWorkflowNode.log.debug("job " + this.jobId + " finihsed with status: " + status);
            if (status == WorkflowJob.Status.SUCCEEDED) {
                this.engine.complete(this.token, Arc.DEFAULT_ARC);
                return;
            }
            this.token.getEnv().setAttribute("hasFailed", true);
            this.token.getEnv().setAttribute("errorMessage", "job status: " + status);
            this.token.getEnv().setAttribute("ENV_ATTRIBUTE_OOZIE_JOB_STATUS", status);
            this.engine.complete(this.token, "failed");
        }
    }

    public void execute(Engine engine, NodeToken nodeToken) {
        log.debug("entering " + OozieWorkflowNode.class + " node...");
        OozieClient oozieClient = new OozieClient(getOozieServiceLocation(nodeToken));
        Properties oozieProperties = setOozieProperties(oozieClient.createConfiguration(), nodeToken);
        String property = oozieProperties.getProperty("oozie.wf.application.path");
        if (property == null) {
            log.error("required property oozie.wf.application.path was not set!");
            nodeToken.getEnv().setAttribute("hasFailed", true);
            nodeToken.getEnv().setAttribute("errorMessage", "required property oozie.wf.application.path was not set!");
            engine.complete(nodeToken, "failed");
            return;
        }
        try {
            String run = oozieClient.run(oozieProperties);
            log.debug("oozie job started for application path: " + property + ", jobId: " + run);
            OozieJobMonitor oozieJobMonitor = new OozieJobMonitor(oozieClient, run, engine, nodeToken, this.monitorSleepTimeSecs);
            setProgressProvider(new OozieProgressProvider(oozieClient, run, nodeToken.getFullEnv().hasAttribute(ENV_ATTRIBUTE_OOZIE_TOTAL_ACTIONS_COUNT) ? Integer.valueOf(nodeToken.getFullEnv().getAttribute(ENV_ATTRIBUTE_OOZIE_TOTAL_ACTIONS_COUNT)).intValue() : this.defaultTotalActionsCount));
            if (this.waitForResults) {
                log.debug("waiting for the job to finish...");
                oozieJobMonitor.run();
            } else {
                log.debug("creating new thread for job status monitoring...");
                Executors.newSingleThreadExecutor().submit(oozieJobMonitor);
                log.debug("new thread fired, exitting");
            }
        } catch (OozieClientException e) {
            failed(engine, nodeToken, e);
        }
    }

    protected Properties setOozieProperties(Properties properties, NodeToken nodeToken) {
        if (this.predefinedProperties != null) {
            for (Map.Entry<String, String> entry : this.predefinedProperties.entrySet()) {
                properties.setProperty(entry.getKey(), entry.getValue());
            }
        }
        Env env = nodeToken.getEnv();
        for (String str : env.getAttributeNames()) {
            properties.setProperty(str, env.getAttribute(str));
        }
        return properties;
    }

    protected String getOozieServiceLocation(NodeToken nodeToken) {
        return nodeToken.getFullEnv().hasAttribute(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC) ? nodeToken.getFullEnv().getAttribute(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC) : this.oozieServiceLocation;
    }

    public void setWaitForResults(boolean z) {
        this.waitForResults = z;
    }

    public void setPredefinedProperties(Map<String, String> map) {
        this.predefinedProperties = map;
    }

    public void setOozieServiceLocation(String str) {
        this.oozieServiceLocation = str;
    }

    public void setMonitorSleepTimeSecs(int i) {
        this.monitorSleepTimeSecs = i;
    }

    public void setDefaultTotalActionsCount(int i) {
        this.defaultTotalActionsCount = i;
    }
}
