7

万锁之母AbstractQueuedSynchronizer

 3 years ago
source link: https://pingao777.github.io/2019/03/12/%E4%B8%87%E9%94%81%E4%B9%8B%E6%AF%8DAbstractQueuedSynchronizer/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

万锁之母AbstractQueuedSynchronizer

2019-03-12

翻看Java“锁”记中提到的各种“锁”,其内部同步实现大多数都和一个类AbstractQueuedSynchronizer相关,这个类称得上“万锁之母”,所以今天就来扒一扒这个类。

为了避免一头扎进去纠缠于各种细节出不来,可以先从宏观上来看一下这个类。首先大家思考一个问题:什么是同步器?假如把线程比作车辆,同步器的角色和警察叔叔差不多,警察的做的事无非是在合适的时机指挥车辆走和停,同步器呢,也是在选择合适的时间调度线程阻塞和执行。

对于车辆来说,什么时候走什么时候停呢,警察叔叔给你招手的时候啊,来来来小伙子,否则就老老实实排队,等警察叔叔给你招手;对于线程来讲也可以采用这种策略,获得许可可以执行,否则排队阻塞,等同步器给与你许可。如果前方交通比较疏松,警察可能一次会叫好几辆车一起走,如果比较拥堵,则会一辆一辆的来;同步器呢同样如此,它有两种模式:共享和独占,前者允许多个线程一起运行,后者只允许单一线程运行。

如果用伪代码表示上面的逻辑可能是这样子的:

// 获得许可
while (不允许获得许可) {
    线程排队
    停止执行
}
从队伍里出来继续执行

// 释放许可
if (允许释放许可) {
    释放许可
    叫醒排队的线程
}

经过上面的分析大致可以提炼出同步器要解决这么几点:

  1. 许可怎么获取和释放
  2. 线程采用什么方法停止和继续执行
  3. 对于不能立马获得许可的线程得有排队机制

说是源码分析其实是自己在学习AbstractQueuedSynchronizer源码的一些学习笔记,并不是完整的源码分析。相信想了解AbstractQueuedSynchronizer运行机制的人多多少少都看过它的代码了,甚至看了一遍都不止,其实大部分代码一般人都能看懂,就是有那么几处难懂的代码,犹如芒刺在背不拔不快。本文就是为了这个目的而写的,并不是要面面俱到而是重点突破,给有心人一点启发。为了符合上下文的语义,下面描述的时候可能节点和线程交替使用,也会把阻塞停止,唤醒叫醒混用,大家留意就是了。

AbstractQueuedSynchronizer整体是利用模板模式,通过维护一个state变量状态配合tryAcquiretryRelease以及tryAcquireSharedtryReleaseShared间接的影响许可获取和释放。

同步器使用CLH队列来维护排队的线程,CLH队列说白了就是一个单向链表,特性是后一个节点的状态是由前一个节点的状态决定的,每个节点都有一个pred指针指向前一个节点,AbstractQueuedSynchronizer在原生的CLH队列基础上进行了优化,加入了一个next指针,指向后继节点,用于提高寻找后继节点的性能,这就形成了一个双向链表。由于没有更新两个volatile的变量的CAS方法,所以next变量为null的时候并不表明没有后继节点,因为有可能一个节点入列的时候更新完pred指针,还没来得及更新next指针。具体结构如下:

CLH队列结构

headtail分别指向队列的头和尾,next我这里画成了虚线,表明其不可靠性。

AbstractQueuedSynchronizer的核心就是如何维护CLH队列的状态,所以我们把重点放在这一块。它提供了两套获取许可和释放许可的方法:acquirereleaseacquireSharedreleaseShared,分别对应独占和共享模式。下面分别看看这两套方法的签名:

// 独占模式模板方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 共享模式模板方法
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

可以看到这两套方法是非常类似的,我们一个一个的看看,首先看看acquire方法:

// acquire方法的逻辑粗看起来可能是先尝试获取下许可
// 如果成功,直接跳出,不用排队了;
// 如果不成功就添加一个独占节点到队列中排队,如果
// 有中断响应中断,细节一个方法一个方法的进入看看
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 尝试获取许可,成功返回true,这个方法没有实现
// 而是留给子类去实现,因为不同的锁获取和释放
// 许可的语义是不同的无法一概而论,所以交由
// 具体的子类去实现,这是典型的模板模式
// 之所以没有用抽象方法,是因为同步器
// 允许只实现独占和共享的一种,如果是
// 抽象方法,则需要实现两套模式的方法
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 从名字上也可以看出这个方法就是往队伍里添加节点进行排队
private Node addWaiter(Node mode) {
    // 以当前线程建立一个新的节点,准备插到队伍里
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 这句是原生英文注释,大意是先尝试快速的路径入队,如果失败
    // 再用完整的入队方法,为什么这里是快呢?先别急,先往下看
    Node pred = tail;
    // pred != null 说明队伍里已有排队者
    if (pred != null) {
        node.prev = pred;
        // 使用CAS操作将当前节点插到队伍里,注意这个时候可能
        // 会有多个线程在同时往里插队,但是CAS操作能确保同一
        // 时间只有一个线程会成功
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

// 这就是所谓的完整的入队方法
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 也就是队列还没有初始化呢,将head和
        // tail都初始化为一个哑节点
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 看这段代码是不是很眼熟了呢,对
            // 这块和快速入队方法基本一样,对比快速和完整
            // 两种入队方法,快速的没有初始化判断,
            // 少了循环,不会重试,相对来说会“快”点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

// 这个方法的大意是如果获得了许可,赶紧出队执行,否则告诉你的
// 前继节点轮到你时叫你,然后老老实实排队等待
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获得node的前继节点
            final Node p = node.predecessor();
            // 如果前继节点是头而且尝试获取许可成功
            // 也就是轮到node出列执行了,即警察叔叔
            // 给你招手了
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 这里是判断是否需要阻塞,需要的话就要调用
            // park方法将线程歇一会,等unpark叫醒线程
            // 的时候会检查中断状态,如果有中断就响应
            // 中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果tryAcquire抛出异常
        if (failed)
            cancelAcquire(node);
    }
}

// 这段代码的大意是节点node给前面的节点pred说哥们我先睡会,到站叫我,pred
// 说好(设置为SIGNAL状态),到站之后叫你,你放心睡吧
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果前继节点的状态是SIGNAL,表明node已经告诉pred到站叫他,
    // 而且pred也已经答应了,所以node可以放心的去睡了
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 这里是删除取消的节点,因为只有CANCEL的节点是大于0
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // node告诉pred到站叫它
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

// 这个方法很简单,没啥好说的,就是去睡觉,醒来
// 之后看看手机有没有人找你(中断)
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

acquire分析完了,再看看release

// 如果释放许可成功,并且后面还有节点,叫醒它
// 返回
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果后面还有节点
        // waitStatus会被设置成SIGNAL,忘记的话可以再
        // 看看前面的shouldParkAfterFailedAcquire方法
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 叫醒后面的哥们一次
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果ws小于0,也就是没有取消,将ws置位0
    // 也就是打算叫醒后面的节点,同时把提醒
    // 状态复位,免得叫醒多次
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    // 如果后面可能没节点了或者节点是取消的
    // 就从后往前找,如果能找到紧随node之后
    // 并且没有取消的节点就叫醒它。这里就是
    // 利用next的优化了,即如果next不为空
    // 且没有取消那么直接叫醒next,如果
    // next为空,不能认定后面就没有节点了
    // 因为next是不可靠的,要利用可靠的pred从后
    // 往前找
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

独占模式的获取和释放代码就分析完了,再来看看共享模式的。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 返回值小于0表示获取失败
// 等于0表示获取许可成功但是后面节点无法再获取了
// 大于0表示获取许可成功并且后面节点还可以再获取
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 粗看doAcquireShared和acquireQueued非常相似,
// 主要是两点不同,一是一个添加的是独占节点,
// 一个添加的是共享节点,另一点不同是
// 一个是setHead,一个是setHeadAndPropagate
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 可以看到setHeadAndPropagate不光像独占模式那样修改了队列的头,
// 还会在某些条件下调用一个doReleaseShared方法
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);

    // 后续节点还有获取许可的机会或者节点没有取消
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 不知道s是什么类型或者s是共享节点
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

// 在并发的条件下叫醒队列头部的线程
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            // 这部分代码和独占模式的release方法几乎一样
            // 也是把队列头的线程叫醒继续执行,但要注意
            // 一个重要区别是这里使用的是CAS操作,上面
            // 独占不是,这是为什么呢?还记得独占和共享
            // 的定义吗?对于共享模式多个线程同时执行
            // 同时也有可能多个线程同时释放,所以必须
            // 使用CAS操作保证线程安全
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 这个分支啥时候会满足呢?根据上面的分析在入队的时候
            // 会调用shouldParkAfterFailedAcquire将前继节点的状态
            // 修改为SIGNAL,这里为0应该发生在头节点没有后继节点
            // 或者后继节点调用shouldParkAfterFailedAcquire
            // 还没返回的时候,再加上这个条件:
            // !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)
            // 那么就剩下了一种情况:头结点的后继节点调用
            // shouldParkAfterFailedAcquire还没把头节点
            // 的状态修改成SIGNAL的时候。如果没有这个分支
            // 只能等待下一次的doReleaseShared的调用才能
            // 将头部的线程叫醒了
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 这句意思是头没变就跳出,那头啥时候变呢,就是出队的时候
        // 也就是有线程已经出队,有责任叫醒新的头节点线程
        if (h == head)                   // loop if head changed
            break;
    }
}

// 可以看到释放许可的主逻辑就是doReleaseShared
// 上文已经分析过在此不再赘述
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

经过上面的源码分析,估计大部分人心里有点数了,可能还形不成清晰的运行图景或者说直觉性的认识,那么接下来说下我自己的一点理解。

整个的图景是这样子的:对于独占模式,因为只有一个线程能获取许可,进而也只有一个线程释放许可,只会叫醒队伍头部的一个线程,这样整个队列是串行出列,并行入列,有点像排队坐公交,虽然队伍后面挤作一团,队伍前面还是有序的,一个一个的上车;对于共享模式而言,由于允许多个线程一起运行,也就是多个线程获得许可,同样也会有多个线程释放许可,这就需要叫醒队伍里多个线程,整个队列的样子是并行出列,并行入列

忽略不必要的细节,来看看独占和共享模式下主逻辑的函数调用栈:

函数主逻辑调用栈

上图左边是独占模式的调用栈,右边是共享模式的调用栈。可以清晰看到为啥共享模式的可以唤醒多个节点,是因为它的调用栈形成了一个环,这样它就不会不停地叫醒后面的共享节点,就像一个连锁反应,并且获取许可和释放许可都会启动这个连锁反应;而独占模式没有形成环,叫醒一个节点就返回了,并且由于共享模式下获取和释放许可都会调用doReleaseShared,二者会形成竞争,这也是doReleaseShared内部使用CAS操作的一个原因。

参考资料:

  • JAVA并发编程实战
  • Doug Lea, The java.util.concurrent Synchronizer Framework

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK