

【JUC】可回调任务FutureTask原理
source link: https://segmentfault.com/a/1190000041364381
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【JUC】可回调任务FutureTask原理
上一篇观察ThreadPoolExecutor的submit方法的时候,发现了它是靠FutureTask实现结果回调的:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // ## 声明一个可回调任务,本质是一个FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 线程池篇分析过 execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
一、FutureTask使用样例
// 1.声明一个可回调任务 FutureTask<String> task = new FutureTask(()->"hello world"); Thread threadA = new Thread(task); threadA.start(); // 2.阻塞方式获取任务执行结果:threadA未执行完,当前线程TreadB会阻塞于此 System.out.println(task.get());
FutureTask实现了RunnableFuture接口,而RunnableFuture=Runnable接口+Future接口
- 线程A执行start方法时,会调用FutureTask的run()方法(Runnable接口)
run()方法会触发FutureTask内部的state状态变更,并调用Callable的call()方法 - 此时线程B以及其它线程调用FutureTask的get()方法(Future接口),这些线程会阻塞等待run()方法完成
源码层面会构建一个名为waiters的单项链表,以LockSupport.part的形式将线程阻塞在节点上 - call()方法执行完成,state状态最终变更为NORMAL,同时释放阻塞线程
// ## 状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 结果返回接口 private Callable<V> callable; // 线程执行方法的返回结果 private Object outcome; // non-volatile, protected by state reads/writes // 正在执行callable接口发的线程 private volatile Thread runner; // 等待节点 private volatile WaitNode waiters;
二、run()
public void run() { if (state != NEW // runner变量赋值 || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // NEW状态下执行 if (c != null && state == NEW) { V result; boolean ran; try { // 调用Callable的call方法,获取返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // == call方法执行成功,设置结果 set(result); } } }
protected void set(V v) { // 状态变更NEW->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 执行结果赋值给outcome outcome = v; // 状态变更COMPLETING->NORMAL,表示执行完成 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // == 释放等待队列中的阻塞线程 finishCompletion(); } }
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // cas方式将waiters变量设置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // ## 遍历队列(单向链表)中的WaitNode节点,释放全部的等待线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 提供的监听方法,需用户自定义实现 done(); callable = null; // to reduce footprint }
三、get()
public V get() throws InterruptedException, ExecutionException { int s = state; // NEW和COMPLETING状态触发等待 if (s <= COMPLETING) // == 等待完成 s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // -- 检查状态,如果此时已变成NORMAL则无需等待 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // -- 检查状态,如果此时是COMPLETING,切换其它线程执行 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // -- 新建等待节点 else if (q == null) q = new WaitNode(); // -- waiters变量赋值 else if (!queued) // ## 头插 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // -- 有超时设置情况 else if (timed) { nanos = deadline - System.nanoTime(); // ## 如果已超时,移除节点 if (nanos <= 0L) { removeWaiter(q); return state; } // ## 如果未超时,阻塞指定时间 LockSupport.parkNanos(this, nanos); } // -- 线程阻塞 else LockSupport.park(this); } }
Recommend
-
67
FutureTask类 重点是那个股票交易处理程序的例子,认真看三遍。本文花了三个小时。 GitHub代码欢迎star。 小白认为学习语言最好的方式就是模仿、思考别人为什么这么写。 FutureTask类同时实现类Runnable接口和Future接口。因此,
-
78
所谓异步任务,就是不在当前线程中进行执行,而是另外起一个线程让其执行。那么当前线程如果想拿到其执行结果,该怎么办呢? 如果我们使用一个公共变量作为结果容器,两个线程共用这个值,那么应该是可以拿到结果的,但是这样一来...
-
9
在 Java 中,Runnable 接口表示一个没有返回结果的任务,而 Callable 接口表示具有返回结果的任务。在并发编程中,异步执行任务,再获取任务结果,可以提高系统的吞吐量。Future 接口应运而生,它表示异步任务的执行结果,并提供了检查任务是否执行完、取...
-
6
构建可回滚的应用及上线checklist实践 木小丰 2021年11月20日 16次浏览 构建可回滚的应用及上线checklist实践 在互联网分布式应用...
-
6
编写可回滚的代码 木小丰 2021年11月16日 72次浏览 可回滚是软件发布的基本规范,尤其是在互联网分布式应用中,如果上线的新版本有bug又不...
-
4
JUC 并发编程 + 底层原理 注意,一定要是JDK1.8、IDE 一定要设置 1、什么是JUC(重要)
-
1
图片来源:视觉中国多家媒体近日登文:...
-
3
摘要: J.U.C是Java并发编程中非常重要的工具包,今天,我们就来着重讲讲J.U.C里面的FutureTask、Fork/Join框架和BlockingQueue。本文分享自华为云社区《
-
2
大家好,我是楼仔!Thread、Runnable、Callable、Future、FutureTask,你能详细讲出他们的内部关系么?这也是面试经常问到的问题。这篇文章主要告诉大家各种对象内部的关系,能达到灵活运用的境界,下面是文章目录:
-
6
联想小新Pro超能本2023升级140W快充:半小时就可回血66% 2023-01-16 14:48 出处/作者:快科技 整合编辑:佚名 今天,联想...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK