

zk系列三:zookeeper实战之分布式锁实现 - 木木他爹
source link: https://www.cnblogs.com/darling2047/p/16870751.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、分布式锁的通用实现思路
分布式锁的概念以及常规解决方案可以参考之前的博客:聊聊分布式锁的解决方案;今天我们先分析下分布式锁的实现思路;
- 首先,需要保证唯一性,即某一时点只能有一个线程访问某一资源;比方说待办短信通知功能,每天早上九点短信提醒所有工单的处理人处理工单,假设服务部署了20个容器,那么早上九点的时候会有20个线程启动准备发送短信,此时我们只能让一个线程执行短信发送,否则用户会收到20条相同的短信;
- 其次,需要考虑下何时应该释放锁?这又分三种情况,一是拿到锁的线程正常结束,另一种是获取锁的线程异常退出,还有种是获取锁的线程一直阻塞;第一种情况直接释放即可,第二种情况可以通过定义下锁的过期时间然后通过定时任务去释放锁;zk的话直接通过临时节点即可;最后一种阻塞的情况也可以通过定时任务来释放,但是需要根据业务来综合判断,如果业务本身就是长时间耗时的操作那么锁的过期时间就得设置的久一点
- 最后,当拿到锁的线程释放锁的时候,如何通知其他线程可以抢锁了呢
这里简单介绍两种解决方案,一种是所有需要锁的线程主动轮询,固定时间去访问下看锁是否释放,但是这种方案无端增加服务器压力并且时效性无法保证;另一种就是zk的watch,监听锁所在的目录,一有变化立马得到通知
二、ZK实现分布式锁的思路
- zk通过每个线程在同一父目录下创建临时有序节点,然后通过比较节点的id大小来实现分布式锁功能;再通过zk的watch机制实时获取节点的状态,如果被删除立即重新争抢锁;具体流程见线图:
提示:需要关注下图里判断自身不是最小节点时的监听情况,为什么不监听父节点?原因图里已有描述,这里就不再赘述
三、ZK实现分布式锁的编码实现
1、核心工具类实现
通过不断的调试,我封装了一个ZkLockHelper
类,里面封装了上锁和释放锁的方法,为了方便我将zk的一些监听和回调机智也融合到一起了,并没有抽出来,下面贴上该类的全部代码
package com.darling.service.zookeeper.lock;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.platform.commons.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: dll
* @date: Created in 2022/11/4 8:41
* @version:
* @modified By:
*/
@Data
@Slf4j
public class ZkLockHelper implements AsyncCallback.StringCallback, AsyncCallback.StatCallback,Watcher, AsyncCallback.ChildrenCallback {
private final String lockPath = "/lockItem";
ZooKeeper zkClient;
String threadName;
CountDownLatch cd = new CountDownLatch(1);
private String pathName;
/**
* 上锁
*/
public void tryLock() {
try {
log.info("线程:{}正在创建节点",threadName);
zkClient.create(lockPath,(threadName).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"AAA");
log.info("线程:{}正在阻塞......",threadName);
// 由于上面是异步创建所以这里需要阻塞住当前线程
cd.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 释放锁
*/
public void unLock() {
try {
zkClient.delete(pathName,-1);
System.out.println(threadName + " 工作结束....");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* create方法的回调,创建成功后在此处获取/DCSLock的子目录,比较节点ID是否最小,是则拿到锁。。。
* @param rc 状态码
* @param path create方法的path入参
* @param ctx create方法的上下文入参
* @param name 创建成功的临时有序节点的名称,即在path的后面加上了zk维护的自增ID;
* 注意如果创建的不是有序节点,那么此处的name和path的内容一致
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info(">>>>>>>>>>>>>>>>>processResult,rx:{},path:{},ctx:{},name:{}",rc,path,ctx.toString(),name);
if (StringUtils.isNotBlank(name)) {
try {
pathName = name ;
// 此处path需注意要写/
zkClient.getChildren("/", false,this,"123");
// List<String> children = zkClient.getChildren("/", false);
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 给children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判断自身是否第一个
// if (Objects.equals(i,0)) {
// // 是第一个则表示抢到了锁
// log.info("线程{}抢到了锁",threadName);
// cd.countDown();
// }else {
// // 表示没抢到锁
// log.info("线程{}抢锁失败,重新注册监听器",threadName);
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* exists方法的回调,此处暂不做处理
* @param rc
* @param path
* @param ctx
* @param stat
*/
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
}
/**
* exists的watch监听
* @param event
*/
@Override
public void process(WatchedEvent event) {
//如果第一个线程锁释放了,等价于第一个线程删除了节点,此时只有第二个线程会监控的到
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zkClient.getChildren("/", false,this,"123");
// // 此处path需注意要写"/"
// List<String> children = null;
// try {
// children = zkClient.getChildren("/", false);
// } catch (KeeperException e) {
// e.printStackTrace();
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// log.info(">>>>>threadName:{},children:{}",threadName,children);
// // 给children排序
// Collections.sort(children);
// int i = children.indexOf(pathName.substring(1));
// // 判断自身是否第一个
// if (Objects.equals(i,0)) {
// // 是第一个则表示抢到了锁
// log.info("线程{}抢到了锁",threadName);
// cd.countDown();
// }else {
// /**
// * 表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;
// * 但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时
// */
// zkClient.exists("/"+children.get(i-1),this,this,"AAA");
// }
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
}
}
/**
* getChildren方法的回调
* @param rc
* @param path
* @param ctx
* @param children
*/
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
try {
log.info(">>>>>threadName:{},children:{}", threadName, children);
if (Objects.isNull(children)) {
return;
}
// 给children排序
Collections.sort(children);
int i = children.indexOf(pathName.substring(1));
// 判断自身是否第一个
if (Objects.equals(i, 0)) {
// 是第一个则表示抢到了锁
log.info("线程{}抢到了锁", threadName);
cd.countDown();
} else {
// 表示没抢到锁
log.info("线程{}抢锁失败,重新注册监听器", threadName);
/**
* 表示没抢到锁;需要判断前置节点存不存在,其实这里并不是特别关心前置节点存不存在,所以其回调可以不处理;
* 但是这里关注的前置节点的监听,当前置节点监听到被删除时就是其他线程抢锁之时
*/
zkClient.exists("/" + children.get(i - 1), this, this, "AAA");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
提示:代码中注释的代码块可以关注下,原本是直接阻塞式编程,将获取所有子节点并释放锁的操作直接写在getChildren方法的回调里,后来发现当节点被删除时我们还要重新抢锁,那么代码就冗余了,于是结合响应式编程的思想,将这段核心代码放到
getChildren方法的回调
里,这样代码简洁了并且可以让业务更只关注于getChildren
这件事了
2、测试代码编写
线程安全问题复现
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
/**
* @description: 开启是个线程给i做递减操作,未加锁的情况下会有线程安全问题
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest02 {
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
incre();
}
}).start();
}
Thread.sleep(5000);
log.info("i = {}",i);
}
/**
* i递减 线程不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("当前线程:{},i = {}",Thread.currentThread().getName(),i--);
}
}
- 上面代码运行结果如下:
使用上面封装的ZkLockHelper
实现的分布式锁
package com.darling.service.zookeeper.lock;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* @description: 使用zk实现的分布式锁解决线程安全问题
* @author: dll
* @date: Created in 2022/11/8 8:32
* @version:
* @modified By:
*/
@Slf4j
public class ZkLockTest03 {
ZooKeeper zkClient;
@Before
public void conn (){
zkClient = ZkUtil.getZkClient();
}
@After
public void close (){
try {
zkClient.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private int i = 10;
@Test
public void test() throws InterruptedException {
for (int n = 0; n < 10; n++) {
new Thread(new Runnable() {
@SneakyThrows
@Override
public void run() {
Thread.sleep(100);
ZkLockHelper zkHelper = new ZkLockHelper();
// 这里给zkHelper设置threadName是为了后续调试的时候日志打印,便于观察存在的问题
String threadName = Thread.currentThread().getName();
zkHelper.setThreadName(threadName);
zkHelper.setZkClient(zkClient);
// tryLock上锁
zkHelper.tryLock();
incre();
log.info("线程{}正在执行业务代码...",threadName);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 释放锁
zkHelper.unLock();
}
}).start();
}
while (true) {
}
}
/**
* i递减 线程不安全
*/
public void incre(){
// i.incrementAndGet();
log.info("☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆☆当前线程:{},i = {}",Thread.currentThread().getName(),i--);
}
}
- 运行结果如下:
由于日志中掺杂着zk的日志所有此处并未截全,但是也能看到i是在按规律递减的,不会出现通过线程拿到相同值的情况
四、zk实现分布式锁的优缺点
- 集群部署不存在单点故障问题
- 统一视图
zk集群每个节点对外提供的数据是一致的,数据一致性有所报障 - 临时有序节点
zk提供临时有序节点,这样当客户端失去连接时会自动释放锁,不用像其他方案一样当拿到锁的实例服务不可用时,需要定时任务去删除锁;临时节点的特性就是当客户端失去连接会自动删除 - watch能力加持
当获取不到锁时,无需客户端定期轮询争抢,只需watch前一节点即可,当有变化时会及时通知,比普通方案即及时又高效;注意这里最好只watch前一节点,如果watch整个父目录的话,当客户端并发较大时会不断有请求进出zk,给zk性能带来压力
- 与单机版redis比较的话性能肯定较差,但是当客户端集群足够庞大且业务量足够多时肯定还是集群更加稳定
好了,zk实现分布式锁的编码实现就到这了,后续有时间再写偏redis的,其实思路缕清了,编码实现还是很简单的
Recommend
-
52
本文涉及到几个zookeeper简单的知识点,永久节点、有序节点、watch机制。比较基础,熟悉的就别看了跳过这篇吧 每个线程在/lo...
-
51
背景ConnectionLoss链接丢失SessionExpired会话过期绕开zookeeperbroker进行状态通知leader选举与zkNode断开做好幂等静态扩容、动态扩容背景分布式锁现在用的越来越多,通常用来协调多个并发任务。在一般的应用场景中存在一定的不安全用法,不安全用法会带来多个ma...
-
45
-
39
在许多场景中,数据一致性是一个比较重要的话题,在单机环境中,我们可以通过Java提供的并发API来解决;而在分布式环境(会遇到网络故障、消息重复、消息丢失等各种问题)下要复杂得多,常见的解决方案是分布式事务、分布式锁等。 ...
-
15
前言 前面已经讲解了Zookeeper可重入锁的实现原理,自己对分布式锁也有了更深的认知。 我在公众号中发了一个疑问,相比于Redis来说,Zookeeper的实现方式要更好一些,即便Redis作者实现了RedLock算法来解决Redis集群...
-
30
前言 这里是zookeeper响应式编程的第二篇——自定义分布式锁,第一篇zookeeper分布式注册配置中心见如下链接: https://segmentfault.com/a/11......
-
8
redis是一种基于内存的非关系型数据库,内存虽然快但是数据也更易丢失,所以redis提供了两种持久化方式,分别是RDB和AOF,今天就介绍下这两种持久化方式以及原理 一、RDB rdb是一种快照式的存储也是redis默认的持久化策略,它将内存中的...
-
5
上篇ES系列一之java端API操作结束后本以为就相安无事了,但生产的问题是层出不穷的;下面我就再记录下近几周遇到的问题以及解决方案; 一 更新E...
-
10
聊聊JUC包下的底层支撑类-AbstractQueuedSynchronizer(AQS) juc包下的一堆并发工具类是我们日常开发特别是面试中常被拿来问的八股文之一,为了工作也好,为了面试也罢,今天开始想尝试着把这些...
-
4
ReentrantLock介绍及源码解析 一、ReentrantLock介绍 ReentrantLock是JUC包下的一个并发工具类,可以通过他显示的加锁(lock)和释放锁(unlock)来实现线程的安全访问,ReentrantLo...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK