13

锁和分布式锁

 5 years ago
source link: https://segmentfault.com/a/1190000017053399?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

锁的由来

多线程环境中,经常遇到多个线程访问同一个 共享资源 ,这时候作为开发者必须考虑如何维护数据一致性,这就需要某种机制来保证只有满足某个条件(获取锁成功)的线程才能访问资源,而不满足条件(获取锁失败)的线程只能等待,在下一轮竞争中来获取锁才能访问资源。

两个知识点:

1.高级缓存Cache

AJjMNb3.jpg!web

CPU为了提高处理速度,不和内存直接进行交互,而是使用Cache。

可能引发的问题:

MBjEJjf.png!web

如果多个处理器同时对共享变量进行读改写操作 (i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的了,操作完之后共享变量的值会和期望的不一致。

造成此结果的原因:

多个处理器同时从各自的缓存中读取变量i,分别进行加1操作,然后分别写入 系统内存中。

处理器层面的解决方案:

处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个 LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。

2.CAS(Compare And Swap)+volatile

CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。执行CAS操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。

java的Atomic以及一些它自带的类中的cas操作都是通过借助cmpxchg指令完成的。他保证同一时刻只能有一个线程cas成功。

举个例子

以AtomicIneger的源码为例来看看CAS操作:

mumE3aJ.png!web

for(;;)表示循环,只有当if判断为true才退出。而if判断的内容就是是否CAS成功。

fuEjA3V.png!web

my63qeB.png!web

volatile的作用:

1)将当前处理器缓存行的数据写回到系统内存。

2)这个写回内存的操作会使在其他CPU里缓存了该内存地址的数据无效。

循环CAS+volatile是实现锁的关键。

Lock锁的部分细节

qaYreqF.png!web

2UJJjeB.png!web

不同场景锁的表现不同:独占?共享?读写?

QVVN3iM.png!web

分布式锁(redis的简单实现)

分布式锁实现的三个核心要素:

  • 1.加锁

最简单的方法是使用setnx命令。key是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给key命名为 “lock_sale_商品ID” 。而value设置成什么呢?我们可以姑且设置成1。加锁的伪代码如下:

setnx(key,1)

SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。
时间复杂度:
O(1)
返回值:
设置成功,返回 1 。
设置失败,返回 0 。

当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁;当一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败。

  • 2.解锁

有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令,伪代码如下:

del(key)

释放锁之后,其他线程就可以继续执行setnx命令来获得锁。

  • 3.设置超时时间

如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。

所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令,伪代码如下:

expire(key, 30)

综合起来,我们分布式锁实现的第一版伪代码如下:

if(setnx(key,1) == 1){
    expire(key,30)
    do something ......
    del(key)
    }

上述代码的问题:

  • 1 setnx和expire的非原子性

iqEnimZ.jpg!web

setnx刚执行成功,还未来得及执行expire指令,节点1 Duang的一声挂掉了。

nAFv6nV.jpg!web

这样一来,这个锁就长生不死了。

解决方案:

Redis 2.6.12以上版本为set指令增加了可选参数,伪代码如下:

set(key,1,30,NX)
  • 2 del 导致误删

ZZzYrif.jpg!web

vAzqeqQ.jpg!web

BB3eqmj.jpg!web

可以在del释放锁之前做一个判断,验证当前的锁是不是自己加的锁

至于具体的实现,可以在加锁的时候把当前的线程ID当做value,并在删除之前验证key对应的value是不是自己线程的ID。

加锁:

String threadId = Thread.currentThread().getId()
set(key,threadId ,30,NX)

解锁:

if(threadId .equals(redisClient.get(key))){
    del(key)
}

这样做又隐含了一个新的问题,判断和释放锁是两个独立操作,不是原子性。

这一块要用Lua脚本来实现:

String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";

redisClient.eval(luaScript , Collections.singletonList(key), Collections.singletonList(threadId));

redis官方说:eval命令在执行lua脚本时会当作一个命令去执行,并且直到命令执行完成redis才会去执行其他命令,所以就变成了一个原子操作。

  • 3出现并发的可能性

进程1在超时时间内未执行完代码,此时进程2是可以获取锁的,会出现两个进程同时访问一个资源的情况。

解决方案:可以在进程1所在的jvm环境中开一个线程专门用来“续命”,当需要解锁的时候,通知这个续命线程结束执行。

private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 线程Id
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }
private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 释放分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @return 是否释放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;

    }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK