9

阅读 JDK 源码:异步任务 FutureTask

 3 years ago
source link: https://segmentfault.com/a/1190000039943509
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在 Java 中,Runnable 接口表示一个没有返回结果的任务,而 Callable 接口表示具有返回结果的任务。
在并发编程中,异步执行任务,再获取任务结果,可以提高系统的吞吐量。Future 接口应运而生,它表示异步任务的执行结果,并提供了检查任务是否执行完、取消任务、获取任务执行结果等功能。FutureTask 是 Future 接口的基本实现,常与线程池实现类 ThreadPoolExecutor 配合使用。

本文基于 jdk1.8.0_91

1. 继承体系

继承体系
RunnableFuture 接口同时实现了 Runnable 接口和 Future 接口,是一种冗余设计。

java.util.concurrent.RunnableFuture

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * 
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

FutureTask 是一个可取消的异步任务,是对 Future 接口的基本实现,具有以下功能:

  • 启动或中断的任务的执行;
  • 判断任务是否执行完成;
  • 获取任务执行完成后的结果。

同时,FutureTask 可以用于包装 Callable 或 Runnable 对象。
由于它实现了 Runnable 接口,可以提交给 Executor 执行。

/**
 * A cancellable asynchronous computation. 
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V>

java.util.concurrent.Executor

/**
 * An object that executes submitted {@link Runnable} tasks.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    void execute(Runnable command);
}

java.util.concurrent.FutureTask

// The run state of this task, initially NEW.
// 任务的执行状态,初始为 NEW。
private volatile int state;

/** The underlying callable; nulled out after running */
// 需要执行的任务,任务执行完后为空
private Callable<V> callable;

/** The result to return or exception to throw from get() */
// 任务的执行结果,或者任务抛出的异常
private Object outcome; // non-volatile, protected by state reads/writes

/** The thread running the callable; CASed during run() */
// 执行任务的线程
private volatile Thread runner;

/** Treiber stack of waiting threads */
// 指向栈顶的指针,栈结构用于存储等待任务执行结果的线程
private volatile WaitNode waiters;

其中 state、runner、waiters 三个属性在并发时存在争用,采用 CAS 维护其准确性。

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

2.1 状态定义

/**
 * The run state of this task, initially NEW.  The run state
 * transitions to a terminal state only in methods set,
 * setException, and cancel.  During completion, state may take on
 * transient values of COMPLETING (while outcome is being set) or
 * INTERRUPTING (only while interrupting the runner to satisfy a
 * cancel(true)). Transitions from these intermediate to final
 * states use cheaper ordered/lazy writes because values are unique
 * and cannot be further modified.
 *
 * Possible state transitions:
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

FutureTask 中使用 state 代表任务在运行过程中的状态。随着任务的执行,状态将不断地进行转变。

状态的说明

  • NEW: 新建状态,任务都从该状态开始。
  • COMPLETING: 任务结果正在设置中(正常结果或异常信息)。
  • NORMAL: 任务正常执行完成。
  • EXCEPTIONAL: 任务执行过程中抛出了异常。
  • CANCELLED: 任务被取消(不响应中断)。
  • INTERRUPTING:任务正在被中断。
  • INTERRUPTED: 任务已经中断。

状态转移过程

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

状态的分类

  • 任务的初始状态:NEW
  • 任务的中间状态:COMPLETING、INTERRUPTING
  • 任务的终止状态:NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTED

2.2 状态使用

FutureTask 中判断任务是否已取消、是否已完成,是根据 state 来判断的。

public boolean isCancelled() {
    return state >= CANCELLED; // CANCELLED、INTERRUPTING、INTERRUPTED
}

public boolean isDone() {
    return state != NEW;
}

可以看到:

  • 被取消或被中断的任务(CANCELLED、INTERRUPTING、INTERRUPTED),都视为已取消。
  • 当任务离开了初始状态 NEW,就视为任务已结束。任务的中间态很短暂,并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果。

根据状态值,FutureTask 可以保证已经完成的任务不会被再次运行或者被取消。

中间状态虽然是一个瞬时状态,在 FutureTask 中用于线程间的通讯。例如:

  • 在 FutureTask#run 中检测到状态 >= INTERRUPTING,说明其他线程发起了取消操作,当前线程需等待对方完成中断。
  • 在 FutureTask#get 中检测到状态 <= COMPLETING,说明执行任务的线程尚未处理完,当前线程需等待对方完成任务。

2.2 栈(Treiber stack)

/** Treiber stack of waiting threads */
private volatile WaitNode waiters; // 栈顶指针

/**
 * Simple linked list nodes to record waiting threads in a Treiber
 * stack.  See other classes such as Phaser and SynchronousQueue
 * for more detailed explanation.
 */
static final class WaitNode {
    volatile Thread thread; // 等待任务执行结果的线程
    volatile WaitNode next; // 栈的下一个节点
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask 使用链表来构造栈(Treiber stack,使用 CAS 保证栈操作的线程安全,如 SynchronousQueue 中的 TransferStack,可参考我对 SynchronousQueue 的源码阅读笔记)。
其中 waiters 是链表的头节点,代表栈顶的指针。

栈的作用
FutureTask 实现了 Future 接口,如果获取结果时,任务还没有执行完毕,那么获取结果的线程就在栈中挂起,直到任务执行完毕被唤醒。

3. 构造函数

赋值任务,设置任务的初始状态。

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Callable}.
 *
 * @param  callable the callable task
 * @throws NullPointerException if the callable is null
 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/**
 * Creates a {@code FutureTask} that will, upon running, execute the
 * given {@code Runnable}, and arrange that {@code get} will return the
 * given result on successful completion.
 *
 * @param runnable the runnable task
 * @param result the result to return on successful completion. If
 * you don't need a particular result, consider using
 * constructions of the form:
 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
 * @throws NullPointerException if the runnable is null
 */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

值得注意的两个地方:

  • FutureTask 创建的时候,状态为 NEW。
  • 由于 FutureTask 使用 Callable 表示任务,需用 Executors#callable 方法将 Runnable 转换为 Callable。
@Test
public void executors() throws Exception {
    Callable<String> callable = Executors.callable(new Runnable() {
        @Override
        public void run() {
            System.out.println("run!");
        }
    }, "haha");
    String call = callable.call();
    System.out.println("call = " + call);
}

执行结果:

run!
call = haha

4. Runnable 实现

4.1 FutureTask#run

代码流程:

  1. 校验任务是否可执行:任务已执行或其他线程已获取执行权,则无法执行。
  2. 调用 Callable#call 执行任务。
  3. 若任务执行失败,使用 setException 方法设置异常。
  4. 若任务执行成功,使用 set 方法设置返回结果。
  5. 最后,清除对当前线程的记录,判断是否等待中断。

注意,在任务执行结束后,属性 runner、callable 都会被清空。

java.util.concurrent.FutureTask#run

public void run() {
    // state != NEW 说明任务已经执行完毕,不再重复执行
    // 将 runner 属性设置为当前线程,若设置失败说明其他线程已获取执行权
    if (state != NEW || 
        !UNSAFE.compareAndSwapObject(this, runnerOffset,  
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call(); // 执行 Callable#call
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex); // 执行失败,设置异常
            }
            if (ran)
                set(result); // 执行成功,设置结果
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING) // INTERRUPTING、INTERRUPTED
            handlePossibleCancellationInterrupt(s);
    }
}

4.1.1 FutureTask#set

任务执行成功之后,调用该方法。
用于设置任务状态、设置任务执行结果、唤醒栈中等待任务执行结果的线程。

java.util.concurrent.FutureTask#set

/**
 * Sets the result of this future to the given value unless
 * this future has already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon successful completion of the computation.
 *
 * @param v the value
 */
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // state: NEW -> COMPLETING
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state: COMPLETING -> NORMAL
        finishCompletion();
    }
}

状态变化:NEW -> COMPLETING -> NORMAL

由于 state 属性是 volatile,这里 putOrderedInt 和 putIntVolatile 是等价的,保证可见性。

为什么这里使用 lazySet 而没有用 CAS :

  • 在并发情况下,只有一个线程执行 CAS 将 state 从 NEW 修改为 COMPLETING 会成功,其他线程均失败。
  • 因此随后只有一个线程继续修改 state 为 NORMAL,不存在争用,无需使用 CAS。

4.1.2 FutureTask#setException

任务执行发生异常,调用该方法。
除了设置任务状态不同,其他与 FutureTask#set 相同。

状态变化:NEW -> COMPLETING -> EXCEPTIONAL

java.util.concurrent.FutureTask#setException

/**
 * Causes this future to report an {@link ExecutionException}
 * with the given throwable as its cause, unless this future has
 * already been set or has been cancelled.
 *
 * <p>This method is invoked internally by the {@link #run} method
 * upon failure of the computation.
 *
 * @param t the cause of failure
 */
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // state: NEW -> COMPLETING
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state: COMPLETING -> EXCEPTIONAL 
        finishCompletion();
    }
}

4.1.3 FutureTask#finishCompletion

执行完毕,唤醒等待线程。

java.util.concurrent.FutureTask#finishCompletion

/**
 * Removes and signals all waiting threads, invokes done(), and
 * nulls out callable.
 */
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 
        // CAS 将 waiters 属性置空:1. CAS 成功,遍历链表唤醒所有节点;2. CAS 失败,重新读取 waiters
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t); // 唤醒节点上的线程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc // 出栈
                q = next;
            }
            break;
        }
    }

    done(); // 预留方法

    callable = null;        // to reduce footprint
}

4.1.4 FutureTask#handlePossibleCancellationInterrupt

在 FutureTask#cancel 方法中,会先将 state 设为 INTERRUPTING,再中断 runner 线程,最后将 state 设为 INTERRUPTED。

所以在 FutureTask#run 的 finally 块中如果检查到 state == INTERRUPTING,说明其他线程发起了 cancel(true) 操作,这里需要等待其他线程中断当前线程。直到检测到 state != INTERRUPTING,说明其他线程已完成中断当前线程操作。

java.util.concurrent.FutureTask#handlePossibleCancellationInterrupt

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING) // 其他线程中断当前线程之后,会设置 state 为 INTERRUPTED,使这里结束循环
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}

4.2 FutureTask#runAndReset

支持周期性执行任务:

  • 执行任务成功,不用返回任务结果,也不用改变任务状态(保持为 NEW),下次可以再次执行任务。
  • 执行任务失败,则设置异常结果,并修改任务状态(不为 NEW),下次无法再次执行任务。
  • 取消执行任务,则等待其他线程中断当前线程,并修改任务状态(不为 NEW),下次无法再次执行任务。
/**
 * designed for use with tasks that intrinsically execute more    // 设计用来支持定时任务
 * than once.
 *
 * @return {@code true} if successfully run and reset
 */
protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex); // 修改 state: NEW -> COMPLETING -> EXCEPTIONAL
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW; // 返回 true 则允许下次再执行 runAndReset
}

5. Future 实现

5.1 Future#get

获取任务执行的结果:

  • 如果任务未完成(NEW、COMPLETING),取结果的线程会阻塞(或自旋)。
  • 如果任务执行出错(EXCEPTIONAL),抛出 ExecutionException
  • 如果任务被取消了(CANCELLED、INTERRUPTING、INTERRUPTED),抛出 CancellationException
  • 如果线程等待被中断,抛出 InterruptedException

java.util.concurrent.FutureTask#get()

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L); // 自旋或阻塞等待任务完成
    return report(s);             // 获取任务执行结果或抛出异常
}

5.1.1 FutureTask#awaitDone

等待任务完成(任务执行完成、任务执行出现异常、任务取消执行),若当前线程发生中断、超时则停止等待。

在自旋中进行判断:

  • 若当前线程已中断,则将节点出栈,抛出 InterruptedException。
  • 若 state > COMPLETING,说明任务已经完成,返回当前 state。
  • 若 state == COMPLETING,说明任务即将完成,当前线程继续自旋。
  • 若 state < COMPLETING,需要将当前线程入栈等待:

    • 无超时时间,一直等待直到被其他线程唤醒(FutureTask#run 或 FutureTask#cancel)或发生中断(Thread#interrupt);
    • 有超时时间,阻塞直到超时、被唤醒、发生中断。若已超时,将节点出栈,返回 state。

java.util.concurrent.FutureTask#awaitDone

/**
 * Awaits completion or aborts on interrupt or timeout.
 *
 * @param timed true if use timed waits
 * @param nanos time to wait, if timed
 * @return state upon completion
 */
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) { // 检查并清除中断状态
            removeWaiter(q);        // 已中断,将节点出栈
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) { // 其他线程已完成任务,结束等待
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();   // 创建节点,设置 q.thread
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); // 节点 q 入栈,作为新的头节点 waiters
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);  // 已超时,将节点出栈
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this); // 进入阻塞,由 FutureTask#run 或 FutureTask#cancel 来唤醒(内部均调用 FutureTask#finishCompletion)
    }
}

5.1.2 FutureTask#report

当前线程等待完毕,获取任务的执行结果,或者抛出异常。

java.util.concurrent.FutureTask#report

/**
 * Returns result or throws exception for completed task.
 *
 * @param s completed state value
 */
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED) // CANCELLED、INTERRUPTING、INTERRUPTED
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

5.2 Future#get(timeout, unit)

在一定的时间之内,等待获取任务执行的结果。

/**
 * @throws CancellationException {@inheritDoc}
 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException(); // 等待超时了,任务还没有执行完,则抛出 TimeoutException
    return report(s);
}

5.3 Future#cancel

尝试取消任务的执行:

  • 如果任务已完成或已取消,则取消操作会失败,返回 false。
  • 如果任务还未执行,则取消操作会成功,返回 true。
  • 如果任务正在执行,方法的参数就会指示线程是否需要中断:

    • mayInterruptIfRunning 为 true,则当前正在执行的任务会被中断;
    • mayInterruptIfRunning 为 false,则允许正在执行的任务继续运行,直到它执行完。

状态变化:
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务还没有启动(NEW),则修改任务状态(INTERRUPTING or CANCELLED),修改成功则进入下一步
    // 如果任务状态不是 NEW,则直接返回。说明任务已完结(已完成、已取消、出现异常),无法取消,返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception 
        // 进入这里,说明任务状态为 INTERRUPTING or CANCELLED
        // mayInterruptIfRunning 为 true 说明需要中断执行任务的线程,为 false 允许任务继续执行完
        if (mayInterruptIfRunning) { 
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                // 只有一个线程会执行到这里,无需使用 CAS
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // INTERRUPTING -> INTERRUPTED 
            }
        }
    } finally {
        finishCompletion(); // 唤醒等待线程
    }
    return true;
}

使用三个线程依次执行:提交任务、等待任务、取消任务。
观察执行结果,理解并发情况下多个线程之间如何使用 Future 进行交互。

/**
 * 三个线程依次执行:提交任务、等待任务、取消任务
 * 在任务未执行完的时候,取消任务。
 * 
 * @author Sumkor
 * @since 2021/4/28
 */
@Test
public void cancel() throws InterruptedException {
    // 定义任务
    FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() {
        @Override
        public String call() throws Exception {
            Thread.sleep(10000);
            return "哦豁";
        }
    });

    CountDownLatch submitGate = new CountDownLatch(1); // 等待任务提交
    CountDownLatch endGate = new CountDownLatch(3);    // 等待线程执行完

    // 提交任务
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.countDown();

                System.out.println(Thread.currentThread().getName() + " 执行任务开始");
                futureTask.run();
                System.out.println(Thread.currentThread().getName() + " 执行任务结束");
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    // 等待任务
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.await();
                Thread.sleep(1000);// 等待 futureTask.run() 执行一段时间后再获取结果

                System.out.println(Thread.currentThread().getName() + " 获取任务结果开始");
                String result = futureTask.get();
                System.out.println(Thread.currentThread().getName() + " 获取任务结果结束 " + result);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName() + " 获取任务结果失败 " + e.getMessage());
                e.printStackTrace();
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    // 取消任务
    new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                submitGate.await();
                Thread.sleep(2000);// 等待 futureTask.get() 执行一段时间后再取消任务

                System.out.println(Thread.currentThread().getName() + " 取消任务开始");
                boolean cancel = futureTask.cancel(true);
                System.out.println(Thread.currentThread().getName() + " 取消任务结束 " + cancel);
            } catch (Exception e) {
                System.out.println(Thread.currentThread().getName() + " 取消任务失败 " + e.getMessage());
                e.printStackTrace();
            } finally {
                endGate.countDown();
            }
        }
    }).start();

    endGate.await();
}

执行结果:

Thread-0 执行任务开始
Thread-1 获取任务结果开始
Thread-2 取消任务开始
Thread-2 取消任务结束 true
Thread-0 执行任务结束
Thread-1 获取任务结果失败 null
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask.report(FutureTask.java:121)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at com.sumkor.pool.FutureTest$6.run(FutureTest.java:129)
    at java.lang.Thread.run(Thread.java:745)
  • 线程 A 启动任务一段时间后,线程 B 来获取任务结果,进入等待。
  • 随后线程 C 取消任务,将线程 A 中断(线程 A 不会抛异常,因为 FutureTask#cancel 先一步修改了 state 导致 FutureTask#setException 中 CAS 失败)。
  • 此时线程 B 在等待中被唤醒(由线程 C 唤醒,检查到 state 为 INTERRUPTED)并抛出异常 CancellationException。
  • FutureTask 实现了 Runnable 和 Future 接口,是一个可取消的异步任务。
  • FutureTask 中的任务具有 7 种状态,多个线程之间通过该状态来操作任务,如判断任务是否已完成、取消任务、获取任务结果。
  • FutureTask 中只要任务不是 NEW 状态,就表示任务已经执行完毕或者不再执行了,并没有表示“任务正在执行中”的状态。
  • FutureTask 中使用链表和 CAS 机制构建一个并发安全的栈,用于存储等待获取任务结果的线程。
  • FutureTask 在等待获取任务结果时,依旧会阻塞主线程,违背了异步的初衷。JDK 8 引入了 CompletableFuture,利用回调机制来做到异步获取任务结果。

作者:Sumkor
链接:https://segmentfault.com/a/11...


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK