3

分布式锁中-基于Zookeeper的实现

 1 year ago
source link: https://www.51cto.com/article/721408.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

分布式锁中-基于Zookeeper的实现

作者:张硕 2022-10-27 10:44:14
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。

1. Zookeeper概述

Zookeeper(后续简称ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK服务会使用Znode存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:

  • 数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。
图片

ZooKeeper's Hierarchical Namespace

  • 集群模式:通常是由3、5个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致
图片

ZooKeeper Service

  • 顺序更新:更新请求都会转由leader执行,来自同一客户端的更新将按照发送的顺序被写入到ZK,处理写请求创建Znode时,Znode名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务
图片

监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个Watch Event,可以感知到节点\数据的变更

图片

临时节点:session链接断开临时节点就没了,不能创建子节点(很关键)

ZK的分布式锁正是基于以上特性来实现的,简单来说是:

  • 临时节点:用于支撑异常情况下的锁自动释放能力
  • 顺序节点:用于支撑公平锁获取锁和排队等待的能力
  • 监听机制:用于支撑抢锁能力
  • 集群模式:用于支撑锁服务的高可用

2. 加解锁的流程描述

图片
  • 创建一个永久节点作为锁节点(/lock2)
  • 试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点
  • 获取锁节点(/lock2)下所有子节点
  • 对所获取的子节点按节点自增序号从小到大排序
  • 判断自己是不是第一个子节点,若是,则获取锁
  • 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应)
  • 若是阻塞申请锁,则申请锁的操作可增加阻塞等待
  • 若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第3步重新进行判断,直到获取到锁
  • 解锁时,将第一个子节点删除释放

3. ZK分布式锁的能力

可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:

  • 互斥性:在同一时刻,只有一个客户端能持有锁
  • 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行
  • 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock)
  • 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程程持有的锁给释放了
  • 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁

基于上文的内容,这里简单总结一下ZK的能力矩阵(其它分布式锁的情况会在后续文章中补充):

MySql

Redis原生

Redlock

链接异常,session关闭后锁会自动释放

线程可重入

加解锁速度

阻塞非阻塞

公平非公平

关于性能不太高的一种说法

因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁。

由于ZooKeeper的高可用特性,在并发量不是太高的场景,也推荐使用ZK的分布式锁。

4. InterProcessMutex 使用示例

Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。

4.1 接口介绍

// 获取互斥锁
public void acquire() throws Exception;
// 在给定的时间内获取互斥锁
public boolean acquire(long time, TimeUnit unit) throws Exception;
// 释放锁处理
public void release() throws Exception;
// 如果当前线程获取了互斥锁,则返回true
boolean isAcquiredInThisProcess();

4.2 pom依赖

<dependency>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-core</artifactId>
  <version>2.8.2</version>
</dependency>
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.5.7</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-framework</artifactId>
  <version>4.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>4.3.0</version>
</dependency>
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-client</artifactId>
  <version>4.3.0</version>
</dependency>

4.3 示例

package com.atguigu.case3;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorLockTest {

    public static void main(String[] args) {

        // 创建分布式锁1
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁2
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock1.acquire();
                    System.out.println("线程1 获取到锁");

                    lock1.acquire();
                    System.out.println("线程1 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock1.release();
                    System.out.println("线程1 释放锁");

                    lock1.release();
                    System.out.println("线程1  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    lock2.acquire();
                    System.out.println("线程2 获取到锁");

                    lock2.acquire();
                    System.out.println("线程2 再次获取到锁");

                    Thread.sleep(5 * 1000);

                    lock2.release();
                    System.out.println("线程2 释放锁");

                    lock2.release();
                    System.out.println("线程2  再次释放锁");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {

        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);

        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181")
                .connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(policy).build();

        // 启动客户端
        client.start();

        System.out.println("zookeeper 启动成功");
        return client;
    }
}

5. DIY一个阉割版的分布式锁

通过这个实例对照第2节内容来理解加解锁的流程,以及如何避免惊群效应。

package com.rock.case2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * zk 分布式锁 v1版本:
 * 完成功能 :
 *      1. 避免了惊群效应
 * 缺失功能:
 *      1. 超时控制
 *      2. 读写锁
 *      3. 重入控制
 */
public class DistributedLock {

    private String connectString;
    private int sessionTimeout;
    private ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String waitPath;
    private String currentNode;
    private String LOCK_ROOT_PATH;

    private static String NODE_PREFIX = "w";

    public DistributedLock(String connectString, int sessionTimeout, String lockName){
        //TODO:数据校验
        this.connectString = connectString;
        this.sessionTimeout = sessionTimeout;
        this.LOCK_ROOT_PATH = lockName;
    }


    public void init() throws IOException, KeeperException, InterruptedException {
        // 建联
        zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> {
            // connectLatch  连接上zk后  释放
            if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                connectLatch.countDown();
            }
        });

        connectLatch.await();// 等待zk正常连接后

        // 判断锁名称节点是否存在
        Stat stat = zk.exists(LOCK_ROOT_PATH, false);
        if (stat == null) {
            // 创建一下锁名称节点
            try {
                zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            } catch (KeeperException e) {
                //并发创建冲突忽略。
                if (!e.code().name().equals("NODEEXISTS")) {
                    throw e;
                }
            }
        }
    }

    /**
     * 待补充功能:
     * 1. 超时设置
     * 2. 读写区分
     * 3. 重入控制
     */
    public void zklock() throws KeeperException, InterruptedException {
        if (!tryLock()) {
            waitLock();
            zklock();
        }
    }

    /**
     *
     */
    private void waitLock() throws KeeperException, InterruptedException {
        try {
            zk.getData(waitPath, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent){
                    // waitLatch  需要释放
                    if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) {
                        waitLatch.countDown();
                    }
                }
            }, new Stat());
            // 等待监听
            waitLatch.await();
        } catch (KeeperException.NoNodeException e) {
            //如果等待的节点已经被清除了,不等了,再尝试去抢锁
            return;
        }

    }

    private boolean tryLock() throws KeeperException, InterruptedException {

        currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点
        List<String> children = zk.getChildren(LOCK_ROOT_PATH, false);
        // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小
        if (children.size() == 1) {
            return true;
        } else {
            String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1);
            // 通过w00000000获取该节点在children集合的位置
            int index = children.indexOf(thisNode);
            if (index == 0) {
                //自己就是第一个节点
                return true;
            }
            // 需要监听  他前一个节点变化
            waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1);
        }
        return false;
    }


    // 解锁
    public void unZkLock(){
        // 删除节点
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

}
责任编辑:武晓燕 来源: 架构染色

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK