5

Future源码一观-JUC系列 - 每当变幻时

 1 year ago
source link: https://www.cnblogs.com/killbug/p/16441226.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Future源码一观-JUC系列

在程序中,主线程启动一个子线程进行异步计算,主线程是不阻塞继续执行的,这点看起来是非常自然的,都已经选择启动子线程去异步执行了,主线程如果是阻塞的话,那还不如主线程自己去执行不就好了。那会不会有一种场景,异步线程执行的结果主线程是需要使用的,或者说主线程先做一些工作,然后需要确认子线程执行情况来进行后续的操作。那么这里就需要子线程异步执行完任务能把结果告诉主线程,并且主线程还能访问到子线程执行任务的状态,比如是否执行完成或正在执行中。

Future就是上面概念的抽象,按照源码中的注释,它代表着一个异步计算的结果,提供的方法中可以通过get方法获取异步线程计算的结果,如果计算还没结束,就会阻塞等待返回成功;也可以通过cancel方法取消异步计算任务;还可以通过isCancelledisDone获得异步执行的状态;如果一个异步执行的内容并没有返回值,但是希望可以使用Future来获得取消异步计算任务的能力,可以返回null。

FutureTask

FutureTask提供了对Future的基础实现,在进入FutureTask源码之前,我们先考虑下如果要实现Future的功能可以怎么设计呢?

1,异步线程进入执行任务的时候,主线程先阻塞住,等到一步线程任务完成有返回结果了,唤醒主线程,把返回结果给它。

2,需要有个标记,记录异步线程有没有执行结束,异步线程任务执行一结束,这个标记就要变更,通过这个标记就可以知道执行状态。

3,能获取异步线程,在执行还没完成先,对异步线程可以中断,这样就可以取消异步线程执行的任务了。

4,异步线程执行和取消操作是有并发竞争的,所以应该对标记的更新做锁保护处理。

对照Future的API,大致能想到这些,实际还有大量关键细节组合才能实现。可以带这个实现思路进入源码的学习。

FutureTask本身就是继承Runnable,Runnable的run方法是没有返回参数的。那么既然FutureTask需要把异步线程执行结果返回,就意味着需要把结果拿到记录。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

当构造函数传的是Runnable的时候,会适配成Callable,所以对于自己的run方法需要返回结果的事那么就好办了,就是调用callable的run方法就行。我们再衍生开去看下Executors.callable(runnable, result);的实现。

public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
}	
static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}

这个适配没什么特殊,把一个result引用作为参数传入,然后作为结果返回。所以其实很少用这种方式来获取result,更多的是传一个null进来,因为更多的时候是想知道异步线程是否执行结束了,而不是要结果。

run方法

FutureTask既然本身就是Runnable,把它作为task提交给线程池执行,就是调用它的run方法。根据前面的分析,这个run方法需要调用内部属性callable的run获得result,然后保存result,以备get方法来获取的时候能直接返回,另外肯定也是要处理异常的场景。

以下是run方法的源码,再加上仔细关注一下状态的流转,就可以比较好的理解这个核心代码了。

public void run() {
  	// 【1】
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
              	// 【2】
                setException(ex);
            }
            if (ran)
              	// 【3】
                set(result);
        }
    } finally {
      	//【4】
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
          	//【5】
            handlePossibleCancellationInterrupt(s);
    }
}

protected void set(V v) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = v;
    UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
    finishCompletion();
  }
}
protected void setException(Throwable t) {
  if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    outcome = t;
    UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
    finishCompletion();
  }
}
private void handlePossibleCancellationInterrupt(int s) {
  if (s == INTERRUPTING)
    while (state == INTERRUPTING)
      Thread.yield(); // wait out pending interrupt
}
  • 【1】执行的起始状态必须是NEW,初始化FutureTask的时候设置的NEW状态,如果不是NEW状态,就退出run方法;并且CAS设置runner字段为当前执行线程,设置失败表示已经设置过,就退出run方法。根据状态和CAS设置runner字段判断,确保了FutureTask实例同时只能有一个一个线程在执行。
  • 【2】执行callable的run方法异常,进行setException操作,先把状态从NEW设置成COMPLETING,设置成功后把outcome字段设置成异常结果,然后将状态设置成EXCEPTIONALfinishCompletion方法在状态进入终态(final state)的时候都会被调用,他会唤醒等待的线程节点,是流程中的关键一环,在后续中详细分析。
  • 【3】正常执行callable的run方法会获得结果,进行set操作,老规矩,先把状态从NEW设置成COMPLIETING,设置成功后把outcome字段设置成返回结果result,以备等待线程来获取,然后把状态设置成NORMALNORMAL作为终态,也会调用finishCompletion方法。
  • 【4】finally代码块,前面有通过判断runner是否为空来避免并发执行,所以最后把runner设置成null,这个注释好理解,在状态确定之前,Runner必须是非空的,以防止对run()的并发调用,这一点结合【1】就可以解释。第二步的注释说,状态重新读取必须在将runner设置为null之后,以防止泄漏中断,这里需要结合cancel方法分析,cancel方法中执行的顺序是先将state修改成INTERRUPTING成功后再使用runner,这里就保证了先设置runner为null后再获取state的最新值。
  • 【5】handlePossibleCancellationInterrupt方法中用一个while循环加Thread.yield()来等待state在INTERRUPTING下变成INTERRUPTED。也就是说当cancel方法把state改成INTERRUPTING后,run方法就会等待cancel方法执行结束后自己才执行结束。

直到网上找到了这篇文章why outcome object in FutureTask is non-volatile?

这里有个很巧妙的设计,就是利用java的happends before中的传递原则,使得在不使用锁的情况下,保证其他线程读到state=NORMAL时,该线程一定能读到outcome的最新值

Task State

前面提到需要一个标记来记录任务的执行状态,源码实现中有一个volatile修饰的int类型state字段(和AQS一样的配方的感觉来了)。

    /**
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
    **/
		private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

注释提供了全部状态流转路径,核心逻辑就是一步步变更状态来进行的。

Treiber Stack

我们需要了解清楚这个Treiber Stack的概念,因为这在JUC源码很多地方有使用,有助于我们理解JUC其他组件代码的实现。

对于一个栈,我们并发往栈里放节点的时候如何处理竞争呢?比较简单的方式就是使用锁,放的时候锁,取的时候锁。

有个大佬他不想用锁,而是利用CAS并发原语设计了一个无锁堆栈,并发表了论文,他名字就叫Treiber,这个就是Treiber Stack的由来。在FutureTask的源码注释中专门提到,很多JDK源码中都用到了类似这种引用,表达这个实现是有坚实理论依据的,有一种做学术的专业氛围。

直接看《Java Concurrency in Practice》中提供的实现代码:

public class TreiberStack <E> {
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }

    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }

    private static class Node <E> {
        public final E item;
        public Node<E> next;

        public Node(E item) {
            this.item = item;
        }
    }
}

这个队列在入队和出队的时候都没有进行锁操作,而是CAS设置头节点是否成功,如果设置成功表示头节点没有被修改过,就没有竞争发生,直接设置头节点,如果CAS设置失败表示有竞争发生,则字段继续,知道设置头节点成功。

其实只要记住一点,操作这个数据结构的入口集中在头节点上,原子操作头节点保证不会发生并发引起的读写数据异常的问题。

下面看一下FutureTask是如何定义这个链表节点的:

WaitNode

使用WaitNode来表示链表节点,内部有记录阻塞等待的线程和下一个节点的引用。

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

以下是FutureTask中实现的Treiber Stack结构图:

image

get方法

前面已经提过,get方法是阻塞线程等待的,怎么阻塞的?多个线程都调用get方法阻塞的时候如何维护这些线程?带着这两问题继续阅读源码。

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
  • 【1】状态不是终态情况下调用awaitDone方法,是终态时调用report方法。对于有超时时间需求的情况,在到达超时时间时awaitDone方法就会返回state结果,如果还不是终态就抛出TimeoutException。
awaitDone

这个方法里实现了如果异步线程还未执行结束的时候,来调用get方法阻塞等待的能力。

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
      	// 【1】
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
      	// 【2】
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
      	// 【3】
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}
  • 【1】首先,判断状态,如果状态大于COMPLETING,执行全部结束,是可以拿到结果了的,就直接返回状态,如果自己线程的节点已经产生,需要把节点中的线程设置为null,注意这里并没有执行删除节点的操作。如果刚好处于COMPLETING状态,说明计算已经结束,正在进行结果或执行异常的设置,这个操作非常快,那就再等等(Thread.yield())。另外,这里可以想象COMPLETING状态是一个非常短暂的状态,所以是放在后面判断的,一般代码都以主意这种细节。
  • 【2】通过前面两个判断表示还未执行结束,那么就需要进入等待了。进入等待前,先要往链表里放节点,如果链表还没节点,就new WaitNode()初始化一个节点,然后再下次循环的时候放入链表,放入的方式就是CAS比对头节点(waiters)是否变化设置。
  • 【3】阻塞线程就是调用LockSupport.park方法阻塞线程,有阻塞就会有唤醒,正常唤醒线程的时候就是计算结束的时候,那么就会执行【1】的逻辑,退出循环;异常的唤醒有可能是线程发生中断,前面代码中对线程中断标记的处理,会移除节点(removeWaiter)并抛出异常。另外,超时情况发生的时候,也会移除节点。

finishCompletion

这个方法在任务执行结束或取消的时候执行,前面提到过的其中执行结束的两种情况是正常执行结束和异常结束。它需要把等待的节点中的线程全部唤醒,在了解了链表结构后,我们看一下这个唤醒操作的代码:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

遍历节点前会先用CAS的方式将头设置成null,成功设置才能继续,所以这里有两个for循环,第二个for循环是遍历链表,找出Thread不为空的节点,用LockSupport.unpark唤醒,被唤醒的线程会从awaitDonepark处醒来继续执行。

其中留了一个done()方法提供给子类扩展,很多字类实现了这个扩展,比如说guava的ListenableFutureTask

removeWaiter

awaitDone方法中的循环中,判断出线程有中断标记的时候会执行removeWaiter,还有就是get超时也会触发。

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s))
                    continue retry;
            }
            break;
        }
    }
}

一个链表中并发删除随机节点自然会有冲突问题,比如同时删除的节点为相邻节点,前面的节点的next可能只想null导致链表断裂。那么这里是如何避免这种问题的呢?

首先,这个方法进入的时候第一步就会把节点的thread设置为null,实际这个操作是可以作为当前线程正在删除这个节点的标记,其他线程只要判断节点是否为null就可以推算出可能有线程正在删除这个节点。

其次,每个节点都会先判断thread是否为空,不为空则会设置给pred,也就是说pred只要有节点这个节点在从节点移除前thread都是不为空的,如果判断出节点的thread为空,那么就跳过判断进入下一个节点的判断,那么这个节点就自然链表中移除了,因为上一个节点的next会指向到thread不为空的下一个节点(pred.next = s)。当next指向后,会再判断pred的thread是否为空,如果是为空就表示可能有线程并发操作,这里就直接从头循环链表。

最后,前两个判断都不成立的情况只有一种那就是头节点的thread为空的情况,此时就要用cas的方式来处理如果设置失败,和前面操作链表一样自旋即可。

cancel

public boolean cancel(boolean mayInterruptIfRunning) {
  	// 【1】
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
  	// 【2】
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
      	//【3】
        finishCompletion();
    }
    return true;
}
  • 【1】第一个判断就是要求状态必须是NEW,如果任务已经开始执行,那么直接就返回false。如果调用cancel方法时状态是NEW,那么直接对这个状态进行CAS修改,如果传参值mayInterruptIfRunning未true,那么状态先改成INTERRUPTING,然后改成INTERRUPTED;如果传参值是true,状态修改为CANCELLED,直接进入终态。这一步修改动作也可能失败,失败意味着装已经从刚刚的NEW发生了变化,那么就不能在进行cancel操作了,直接返回false。

  • 【2】上面的代码执行成功,意味这状态成功从NEW改成了INTERRUPTING或CANCELLED。

    • 如果传参为true,此时状态必然已经是INTERRUPTING,然后就开始进行线程中断操作,并最终将状态变更为INTERRUPTED。
    • 如果传参为false,此时状态为CANCELLED,已是终态,返回true即可
  • 【3】无论是INTERRUPTED还是CANCELLED的结果,都会执行finishCompletion方法,该方法前面已详细解析。

《Netty实战》中有写到JDK中Future所提供的实现只允许手动检查对应的操作是否完成,或者一直阻塞知道它完成。这是非常烦琐的,所以Netty提供了自己的实现,所以下一站,ChannelFuture。

https://yangsanity.me/2021/07/27/FutureTask/

https://en.wikipedia.org/wiki/Treiber_stack

https://www.cnblogs.com/iwehdio/p/14285282.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK