20

[Java并发] AQS抽象队列同步器源码解析--锁获取过程

 4 years ago
source link: http://www.cnblogs.com/zyg-coding/p/12045194.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

要深入了解java并发知识,AbstractQueuedSynchronizer(AQS)是必须要拿出来深入学习的,AQS可以说是贯穿了整个JUC并发包,例如ReentrantLock,CountDownLatch,CyclicBarrier等并发类都涉及到了AQS。接下来就对AQS的实现原理进行分析。

在开始分析之前,势必先将CLH同步队列了解一下

CLH同步队列

CLH自旋锁: CLH(Craig, Landin, and Hagersten locks): 是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH自旋锁是一种基于隐式链表(节点里面没有next指针)的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。

AQS中的CLH同步队列:AQS中CLH同步队列是对CLH自旋锁进行了优化,其主要从两方面进行了改造:节点的结构与节点等待机制。

1.在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,并且每个节点都引入前驱节点和后后续节点的引用;

2.在等待机制上由原来的自旋改成阻塞唤醒。

源码中CLH的简单表示

*      +------+  prev +-----+       +-----+
* head |      | <---- |     | <---- |     |  tail
*      +------+       +-----+       +-----+

Node就是实现CLH同步队列的数据结构,计算下就了解下该类的相关字段属性

AQS中重要的内部类Node

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 独占模式
    static final Node EXCLUSIVE = null;

    // 如果属性waitStatus == Node.CANCELLED,则表明该节点已经被取消
    static final int CANCELLED =  1;
    // 如果属性waitStatus == Node.SIGNAL,则表明后继节点等待被唤醒
    static final int SIGNAL    = -1;
    // 如果属性waitStatus == Node.CONDITION,则表明是Condition模式中的节点等待条件唤醒
    static final int CONDITION = -2; 
    // 如果属性waitStatus == Node.PROPAGATE,在共享模式下,传播式唤醒后继节点
    static final int PROPAGATE = -3; 
    // 用于标记当前节点的状态,取值为1,-1,-2,-3,分别对应以上4个取值
    volatile int waitStatus;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 当前节点对应的线程,阻塞与唤醒的线程
    volatile Thread thread;
    // 使用Condtion时(共享模式下)的后继节点,在独占模式中不会使用
    Node nextWaiter;
    final boolean isShared() {
            return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

下面就开始着重对AQS中的重要方法进行分析说明

获取锁

1.acquire 开始获取锁

public final void acquire(int arg) {
    //如果tryAcquire返回true,即获取到锁就停止执行,否则继续向下执行向同步队列尾部添加节点
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire是用于获取锁的方法,在AQS中默认没有实现具体逻辑,由子类自定义实现。

如果返回true则说明获取到锁,否则需要将当前线程封装为Node节点添加到同步队列尾部。

2.当前节点入队列

将当前执行的线程封装为Node节点并加入到队尾

private Node addWaiter(Node mode) {// 首先尝试快速添加到队尾,失败再正常执行添加到队尾
    Node node = new Node(Thread.currentThread(), mode);
    // 快速方式尝试直接添加到队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果快速添加到队尾失败则执行enq(node)添加到队尾
    enq(node);
    return node;
}

enq方法循环遍历添加到队尾

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 添加到队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

addWaiter(Node mode)方法执行完后,接下来执行acquireQueued方法, 返回的是该线程是否需要中断,该方法也是不停地循环获取锁,如果前节点是头节点,则尝试获取锁,获取锁成功则返回是否需要中断标志,如果获取锁失败,则判断是否需要阻塞并阻塞线程

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 标记是否需要被中断
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,并且获取锁成功,则返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断获取锁失败后是否需要阻塞当前线程,如果阻塞线程后再判断是否需要被中断线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(p, node)方法判断是否需要阻塞当前线程

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 如果ws == Node.SIGNAL,则说明当前线程已经准备好被唤醒,因此现在可以被阻塞,之后等待被唤醒
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        // 如果ws > 0,说明当前节点已经被取消,因此循环剔除ws>0的前驱节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //如果ws<=0,则将标志位设置为Node.SIGNAL,当还不可被阻塞,需要的等待下次执行shouldParkAfterFailedAcquire判断是否需要阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

如果shouldParkAfterFailedAcquire(p, node)方法返回true,说明需要阻塞当前线程,则执行parkAndCheckInterrupt方法阻塞线程,并返回阻塞过程中线程是否被中断

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 阻塞线程,等待unpark()或interrupt()唤醒自己
    // 线程被唤醒后查看是否被中断过。
    return Thread.interrupted();
}

那么重新回到获取锁的方法acquire方法,如果acquireQueued(final Node node, int arg)返回true,也即是阻塞过程中线程被中断,则执行中断线程操作selfInterrupt()

public final void acquire(int arg) {
    //如果tryAcquire返回true,即获取到锁就停止执行,否则继续向下执行向同步队列尾部添加节点,然后判断是否被中断过,是则执行中断
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

中断当前线程

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

小结

AQS获取锁的过程:

1.执行tryAcquire方法获取锁,如果获取锁成功则直接返回,否则执行步骤2

2.执行addWaiter方法将当前线程封装位Node节点并添加到同步队列尾部,执行步骤3

3.执行acquireQueued循环尝试获取锁,,如果获取锁成功,则判断返回中断标志位,如果获取锁失败则调用shouldParkAfterFailedAcquire方法判断是否需要阻塞当前线程,如果需要阻塞线程则调用parkAndCheckInterrupt阻塞线程,并在被唤醒后再判断再阻塞过程中是否被中断过。

4.如果acquireQueued返回true,说明在阻塞过程中线程被中断过,则执行selfInterrupt中断线程

好了,以上就是AQS的锁获取过程,关于锁释放的分析会在后续继续输出。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK