53

ZooKeeper 分布式锁 - -Finley-

 5 years ago
source link: https://www.cnblogs.com/Finley/p/9694637.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Redis分布式锁一文中, 作者介绍了如何使用Redis开发分布式锁。

Redis分布式锁具有轻量高吞吐量的特点,但是一致性保证较弱。我们可以使用Zookeeper开发分布式锁,来满足对高一致性的要求。

Zookeeper 分布式锁原理#

Zookeeper 节点具有一些性质可以帮助我们开发分布式锁:

  • 临时节点: 客户端可以创建临时节点,当客户端会话终止或超时后Zookeeper会自动删除临时节点。该特性可以用来避免死锁。
  • 触发器: 当节点的状态发生改变时,Zookeeper会通知监听相应事件的客户端。该特性可以用来实现阻塞等待加锁。
  • 有序节点: 客户端可以在某个节点下创建子节点,Zookeeper会根据子节点数量自动生成整数序号,类似于数据库的自增主键。

一种比较容易想到的分布式锁实现方案是:

  1. 检查锁节点是否已经创建,若未创建则尝试创建一个临时节点
  2. 若临时节点创建成功说明已成功加锁。若持有锁的客户端崩溃或网络异常无法维持Session,锁节点会被删除不会产生死锁。
  3. 若临时节点创建失败说明加锁失败,等待加锁。watch锁节点exists事件,当接收到节点被删除的通知后再次尝试加锁。
  4. 因为Zookeeper中的Watch是一次性的,若再次尝试加锁失败,需要重新设置Watch。
  5. 操作完成后,删除锁节点释放锁。

该方案存在的问题是,当锁被释放时Zookeeper需要通知大量订阅了该事件的客户端,这种现象称为"惊群现象"或"羊群效应"。

惊群现象对Zookeeper正常提供服务非常不利,因此实践中通常采取另一种方案:

  1. 创建一个永久节点作为锁节点,试图加锁的客户端在锁节点下创建临时顺序节点。Zookeeper会保证子节点的有序性。
  2. 若锁节点下id最小的节点是为当前客户端创建的节点,说明当前客户端成功加锁。
  3. 否则加锁失败,订阅上一个顺序节点。当上一个节点被删除时,当前节点为最小,说明加锁成功。
  4. 操作完成后,删除锁节点释放锁。

该方案每次锁释放时只需要通知一个客户端,避免惊群现象发生。

该方案的特征是优先排队等待的客户端会先获得锁,这种锁称为公平锁。而锁释放后,所有客户端重新竞争锁的方案称为非公平锁。

Demo#

本节作者将使用Zookeeper官方Java API实现一个简单的公平锁。

使用Maven进行依赖管理,项目依赖 Zookeeper 官方 java sdk 和 apache commons-lang3工具包:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.6</version>
</dependency>
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.5</version>
    <type>pom</type>
</dependency>

点击查看完整代码:

package zk;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;

/**
 * @author finley
 */
public class ZKLock {

    private ZooKeeper zk;

    private String basePath;

    private String lockPath;

    private static final byte[] LOCK_DATA = "".getBytes();

    // zk 为客户端连接实例, basePath 为锁节点路径,我们将在 basePath 下创建顺序子节点
    public ZKLock(ZooKeeper zk, String basePath) {
        // 按照 zk 的路径规则,以'/'开始,不得以'/'结束
        if (basePath.endsWith("/") || !basePath.startsWith("/")) {
            throw new IllegalArgumentException("base path must start with '/', and must not end with '/'");
        }
        this.zk = zk;
        this.basePath = basePath;
    }

    // 检测 basePath 节点是否存在, 若不存在则创建
    private void ensureBasePath() throws KeeperException, InterruptedException {
        if (zk.exists(basePath, false) == null) {
            // basePath 不存在,进行创建
            List<String> pathParts = new ArrayList<>(Arrays.asList(basePath.split("/"))); // 将路径处理为节点列表
            pathParts.remove(0); //因为 basePath 以'/'开始, pathParts[0] 一定是空串,将其移除

            // 自底向上,寻找路径中最后一个存在的节点
            int last = 0;
            for (int i = pathParts.size() - 1; i >= 0; i--) {
                String path = "/" + StringUtils.join(pathParts.subList(0, i), '/');
                if (zk.exists(path, false) != null) {
                    last = i;
                    break;
                }
            }

            // 从最后一个存在的节点开始,依次创建节点
            for (int i = last; i < pathParts.size(); i++) {
                String path = "/" + StringUtils.join(pathParts.subList(0, i + 1), '/');
                try {
                    zk.create(path, LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                } catch (KeeperException.NodeExistsException ignore) {} // may created by other thread
            }

        }
    }

    // 阻塞直至加锁成功
    public void lock() throws KeeperException, InterruptedException {
        ensureBasePath();

        // 在 basePath 下创建临时顺序子节点
        String lockPath = zk.create(basePath + "/lock_", LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(Thread.currentThread().getName() + " create: " + lockPath);

        // 循环检查加锁是否成功
        while(true) {
            // 取出 basePath 中所有节点并找到最小子节点
            // 因为顺序子节点总是递增的,新创建的节点一定比当前 lockPath 更大,所以 create 和 getChildren 两个操作不保持原子性不会出现异常
            List<String> children = zk.getChildren(basePath,false);
            Collections.sort(children);
            String minNode = children.get(0);

            // 当前线程创建了最小子节点,加锁成功
            if (StringUtils.isNotBlank(lockPath) && StringUtils.isNotBlank(minNode) && StringUtils.equals(lockPath, basePath + "/" + minNode) {
                this.lockPath = lockPath; // 加锁成功,写入锁路径
                return;
            }

            // 加锁失败,设置 watch
            String watchNode = null;
            String node = lockPath.substring(lockPath.lastIndexOf("/") + 1);
            for (int i = children.size() - 1; i >= 0; i--) {
                String child = children.get(i);
                if (child.compareTo(node) < 0) {
                    watchNode = child;
                    break;
                }
            }

            // 找到需要监视的节点,设置 watch
            if (watchNode != null) {
                System.out.println(Thread.currentThread().getName() + " watch: " + watchNode);

                String watchPath = basePath + "/" + watchNode;
                
                // 监视 getData 而非 exists 的原因是: 在获取子节点和设置 watch 这段时间内,被监视的节点可能已被删除(锁释放/持有者崩溃)
                // exists 监视会成功设置,但永远不会触发NodeDeleted事件(顺序子节点序号自增,不会复用使用过的序号)。本方法会无限制等待下去
                // 若被监视节点已删除,getData 会抛出异常,避免线程浪费时间等待

                // 该调用中的 watch 回调当事件发生时会在另一个线程中执行
                try {
                    zk.getData(watchPath, event -> {
                        if(event.getType() == Watcher.Event.EventType.NodeDeleted) {
                            // 主线程会调用 this.wait()
                            // fixme: 这里有一个bug,若事件类型不是 NodeDeleted 应进行处理。分布式锁不会产生这种情况,可能是其它客户端操作所致
                            synchronized (this) {
                                notifyAll();
                            }
                        }
                    }, null);
                } catch(KeeperException.NoNodeException e) {
                    // 因为上一个节点被删除导致 getData watch 失败,进入下一个次循环,重新检查自己是否已持有锁
                    continue;
                }
        

                synchronized (this) {
                    // 等待被 watch 唤醒,唤醒后进入下一次循环,重新检查确认自己已持有锁
                    wait();
                    System.out.println(Thread.currentThread().getName() + " notified");
                }
            }
        }    

    }

    // 释放锁
    public void unlock() throws KeeperException, InterruptedException {
        // 加锁成功时会将锁路径写入 lockPath
        if (StringUtils.isNotBlank(lockPath)) {
            zk.delete(lockPath, -1); // 删除锁记录释放锁
        } else {
            throw new IllegalStateException("don't has lock"); // 未设置锁记录说明本线程未持有锁
        }
    }

    public static void main(String[] args) {
        int concurrent = 10;
        ExecutorService service = Executors.newFixedThreadPool(concurrent);
        for (int i = 0; i < concurrent; i++) {
            service.execute(() -> {
                // 为保证各线程独立的持有锁,每个线程应持有独立的 zookeeper 会话
                ZooKeeper zk;
                try {

                    zk = new ZooKeeper("localhost:2181", 6000, watchedEvent -> {
                        if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState())
                            System.out.println("connection is established...");
                    });

                    ZKLock lock = new ZKLock(zk, "/test/node1");

                    lock.lock();
                    System.out.println(Thread.currentThread().getName() + "  acquire success");

                    Thread.sleep(1000);
                    System.out.println("do sth, thread: " + Thread.currentThread().getName());

                    lock.unlock();
                    System.out.println(Thread.currentThread().getName() + "  release success");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }

}

Curator#

Cruator 是一个 Zookeeper 工具集, 提供了包括分布式锁在内的常用应用的封装,本文以 Cruator 的分布式锁实现源码为例进行分析。

使用maven安装依赖:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.1</version>
</dependency>

编写加锁代码:

public class ZkLock {

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
        client.start();

        // 锁节点为 /curator/mutex
        InterProcessMutex mutex = new InterProcessMutex(client, "/curator/mutex");
        try {
            // 尝试加锁
            mutex.acquire();
            // 完成业务
            System.out.println("foo bar");
        } finally {
            // 释放锁
            mutex.release();
            client.close();
        }

    }

}

接下来分析InterProcessMutex.acquire()的实现:

/**
 * Acquire the mutex - blocking until it's available. Note: the same thread
 * can call acquire re-entrantly. Each call to acquire must be balanced by a call
 * to {@link #release()}
 *
 * @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

接下来看internalLock方法:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    Thread currentThread = Thread.currentThread();

    // threadData 是一个 ConcurrentMap, 记录各线程锁的状态
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null ) // lockData 不为空, 说明线程已经持有锁
    {
        // 重入锁,重入计数器增加
        lockData.lockCount.incrementAndGet();
        return true;
    }

    // internals.attemptLock 完成实际的访问Zookeeper获取锁的操作
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

分析实际执行加锁操作的internals.attemptLock方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;

    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;

    // 自旋加锁
    while ( !isDone )
    {
        isDone = true;

        try
        {
            // 在锁节点下创建临时顺序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 等待自己的节点成为最小的节点,即加锁成功
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // 当 session 超时会抛出异常,根据重试策略直接进行重试 
            if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
            {
                isDone = false;
            }
            else
            {
                throw e;
            }
        }
    }

    if ( hasTheLock )
    {
        return ourPath;
    }

    return null;
}

首先阅读StandardLockInternalsDriver.createsTheLock() 源码:

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

创建临时顺序节点, 不再赘述。

接下来查看internalLockLoop:

while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
    // 获得所有子节点,按序号升序排列
    List<String>        children = getSortedChildren();

    // 判断自己是否为序号最小的节点
    String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
    PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
    if ( predicateResults.getsTheLock() )
    {
        haveTheLock = true;
    }
    else
    {
        // 获得前一个节点的路径
        String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

        // 监听前一个节点并进行wait(), 当锁被释放时会通过notifyall() 唤醒
        synchronized(this)
        {
            try 
            {
                // 使用getData()而非exists()监听器的原因是:
                // 若此时前一个节点已被删除exists()仍会成功设置,但不可能被触发(顺序节点不会再次使用前一个节点的序号)。这会使方法浪费时间等待,也属于Zookeeper资源浪费
                // 若前一个节点被删除getData() 会抛出异常
                client.getData().usingWatcher(watcher).forPath(previousSequencePath);

                // 若设置了等待时间
                if ( millisToWait != null )
                {
                    millisToWait -= (System.currentTimeMillis() - startMillis);
                    startMillis = System.currentTimeMillis();
                    if ( millisToWait <= 0 )
                    {
                        doDelete = true;    // timed out - delete our node
                        break;
                    }
                    // 等待指定的时间
                    wait(millisToWait);
                }
                else
                {
                    // 永远等待
                    wait();
                }
            }
            catch ( KeeperException.NoNodeException e ) 
            {
                // getData() 抛出此异常说明前一个节点已被删除, 重新尝试获取锁。
            }
        }
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK