/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.iis.common.lock;

import eu.dnetlib.iis.core.java.HadoopContext;
import eu.dnetlib.iis.core.java.PortBindings;
import eu.dnetlib.iis.core.java.Process;
import eu.dnetlib.iis.core.java.porttype.PortType;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

public class LockManagingProcess
implements Process {
    public static final String DEFAULT_ROOT_NODE = "/cache";
    public static final String NODE_SEPARATOR = "/";
    public static final String PARAM_ZK_SESSION_TIMEOUT = "zk_session_timeout";
    public static final String PARAM_ROOT_NODE = "root_node";
    public static final String PARAM_NODE_ID = "node_id";
    public static final String PARAM_LOCK_MODE = "mode";
    public static final int DEFAULT_SESSION_TIMEOUT = 60000;
    public static final Logger log = Logger.getLogger(LockManagingProcess.class);

    public Map<String, PortType> getInputPorts() {
        return Collections.emptyMap();
    }

    public Map<String, PortType> getOutputPorts() {
        return Collections.emptyMap();
    }

    public void run(PortBindings portBindings, HadoopContext context, Map<String, String> parameters) throws Exception {
        String rootNode;
        if (!parameters.containsKey(PARAM_NODE_ID)) {
            throw new Exception("node id not provided!");
        }
        if (!parameters.containsKey(PARAM_LOCK_MODE)) {
            throw new Exception("lock mode not provided!");
        }
        String zkConnectionString = context.getConfiguration().get("ha.zookeeper.quorum");
        if (zkConnectionString == null || zkConnectionString.isEmpty()) {
            throw new Exception("zookeeper quorum is unknown, invalid ha.zookeeper.quorum property value: " + zkConnectionString);
        }
        int sessionTimeout = parameters.containsKey(PARAM_ZK_SESSION_TIMEOUT) ? Integer.valueOf(parameters.get(PARAM_ZK_SESSION_TIMEOUT)) : 60000;
        final ZooKeeper zk = new ZooKeeper(zkConnectionString, sessionTimeout, new Watcher(){

            public void process(WatchedEvent event) {
            }
        });
        String string = rootNode = parameters.containsKey(PARAM_ROOT_NODE) ? parameters.get(PARAM_ROOT_NODE) : DEFAULT_ROOT_NODE;
        if (zk.exists(rootNode, false) == null) {
            log.warn((Object)("initializing root node: " + rootNode));
            zk.create(rootNode, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            log.warn((Object)"root node initialized");
        }
        final String nodePath = LockManagingProcess.generatePath(parameters.get(PARAM_NODE_ID), rootNode);
        LockMode lockMode = LockMode.valueOf(parameters.get(PARAM_LOCK_MODE));
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        if (LockMode.obtain.equals((Object)lockMode)) {
            log.warn((Object)("trying to obtain lock: " + nodePath));
            if (zk.exists(nodePath, new Watcher(){

                public void process(WatchedEvent event) {
                    if (Watcher.Event.EventType.NodeDeleted == event.getType()) {
                        log.warn((Object)(nodePath + " lock release detected"));
                        log.warn((Object)("creating new lock instance: " + nodePath + "..."));
                        try {
                            zk.create(nodePath, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                            log.warn((Object)("lock" + nodePath + " created"));
                            semaphore.release();
                        }
                        catch (KeeperException e) {
                            throw new RuntimeException(e);
                        }
                        catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }
                }
            }) == null) {
                log.warn((Object)("lock not found, creating new lock instance: " + nodePath));
                zk.create(nodePath, new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                log.warn((Object)("lock" + nodePath + " created"));
                semaphore.release();
            } else {
                log.warn((Object)"waiting until lock is released");
                long startTime = System.currentTimeMillis();
                semaphore.acquire();
                log.warn((Object)("lock released, waited for " + (System.currentTimeMillis() - startTime) + " ms"));
                semaphore.release();
            }
        } else if (LockMode.release.equals((Object)lockMode)) {
            log.warn((Object)("removing lock" + nodePath + "..."));
            zk.delete(nodePath, -1);
            log.warn((Object)("lock" + nodePath + " removed"));
        } else {
            throw new Exception("unsupported lock mode: " + (Object)((Object)lockMode));
        }
    }

    public static final String generatePath(String nodeId, String rootNode) {
        if (nodeId != null) {
            return rootNode + NODE_SEPARATOR + nodeId.replace('/', '_');
        }
        return null;
    }

    public static enum LockMode {
        obtain,
        release;

    }
}

