11

java线程池源码解析

 3 years ago
source link: http://cbaj.gitee.io/blog/2020/08/22/java%E7%BA%BF%E7%A8%8B%E6%B1%A0%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

java线程池源码解析

2020-08-22

| 并发多线程

| 11次阅读

2.8k

|

13

主要介绍线程池相关知识,关于线程池,首先我们思考下为什么要用线程池。如果单纯的使用线程,线程的创建和销毁都是自己来完成,如果并发请求过多,可能造成资源耗尽。线程池可以对线程进行统一分配,调优和监控。本篇文章为《图灵学院》课程笔记

  • 降低资源消耗(线程无限制地创建,然后使用完毕后销毁)
  • 提高响应速度(无须创建线程)
  • 提高线程的可管理性

java是如何实现和管理线程池的,jdk5开始把工作单元和任务执行分离,工作单元包括callable、runnable,而执行机制由Executor提供,Executor的实现还提供了对线程生命周期的管理

Executor.png
  • java.util.concurrent.Executor (执行器,执行方法)

  • java.util.concurrent.ExecutorService (执行服务) 包含服务的生命周期

  • java.util.concurrent.ScheduledExecutorService (调度相关的服务)

核心接口实现

  • java.util.concurrent.ThreadPoolExecutor (普通的的线程池实现类)
  • java.util.concurrent.ScheduledThreadPoolExecutor (调度的核心实现类)
名称 方法 说明 类型 java.util.concurrent.
Executor execute 执行接口 接口 java.util.concurrent.
ExecutorService submit(java.util.concurrent.Callable) 提交接口 接口 java.util.concurrent.
AbstractExecutorService submit(Callable task) 把执行和提交接口
进行合并区别:有
返回值和无返回值 抽象类 java.util.concurrent.
ThreadPoolExecutor execute(Runnable command) 调 用
runwork 方 法
getTask(从队列
拿数据) 实现类 java.util.concurrent.
ScheduledExecutorService scheduleAtFixedRate、scheduleWithFixedDelay 定义方法 接口 java.util.concurrent.
ScheduledThreadPoolExecutor delayedExecute 具体实现
add>task>addWo
rk 实现类
20200822140706.png

内部类分为两种

  • policy 策略
  • worker 工作

内部工作原理(构造方法赋值)

  • corePool:核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:任务工作队列
  • keepAliveTime:线程活跃时间,如果线程数量大于核心线程数量,多余线程空闲时间超时候被销毁
  • RejectedExecutionHandler:当ThreadPoolExecutor关闭或最大线程池已经满了,executor将调用的handler
  • ThreadFactory:使用ThreadFactory创建线程,默认使用defaultThreadFactory

线程池的运行思路

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止
%E7%BA%BF%E7%A8%8B%E6%B1%A0.png
1
2
3
4
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常(默认)
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池的执行原理

初始化构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor#execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//判断是否小于核心数量,是直接新增work成功后直接退出
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 增加失败后继续获取标记
c = ctl.get();
}
//判断是运行状态并且扔到workQueue里成功后
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次check判断运行状态如果是非运行状态就移除出去&reject掉
if (! isRunning(recheck) && remove(command))
reject(command);
//否则发现可能运行线程数是0那么增加一个null的worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//直接增加worker如果不成功直接reject
else if (!addWorker(command, false))
reject(command);
}

ThreadPoolExecutor#addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 两种情况
//1.如果非运行状态
//2.不是这种情况(停止状态并且是null对象并且workQueue不等于null)
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;// 判断是否饱和容量了
if (compareAndIncrementWorkerCount(c)) //增加一个work数量 然后跳出去
break retry;
c = ctl.get(); // Re-read ctl 增加work失败后继续递归
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//增加一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
//判断是否 为null
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired. 锁定后并重新检查下 是否存在线程工厂的失败或者锁定前的关闭
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); //增加work
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) { //本次要是新增加work成功就调用start运行
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

ThreadPoolExecutor#runWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();//1.取到当前线程
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { //获取任务 看看是否能拿到
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();// 确保线程是能中断的
try {
beforeExecute(wt, task); //开始任务前的钩子
Throwable thrown = null;
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //任务后的钩子
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

ThreadPoolExecutor#processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
 private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); //移除work
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) { //判断是否还有任务
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
20200822141910.png

线程池调度原理

调度核心构造器

1
2
3
4
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor#delayedExecute

1
2
3
4
5
6
7
8
9
10
11
12
13
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);//增加任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}

通过DelayedWorkQueue 延迟队列实现 offer获取对象的延迟

ScheduledThreadPoolExecutor.DelayedWorkQueue#offer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; //当前对象
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length) //扩容
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0); //第一个直接设置索引和下标0
} else {
siftUp(i, e); //筛选到上边
}
if (queue[0] == e) {
leader = null;
available.signal(); //唤醒所有的被挤压的wait线程
}
} finally {
lock.unlock();
}
return true;
}

ScheduledThreadPoolExecutor.DelayedWorkQueue#siftUp

1
2
3
4
5
6
7
8
9
10
11
12
13
 private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

ScheduledThreadPoolExecutor.ScheduledFutureTask#compareTo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time; //判断time
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

ThreadPoolExecutor#ensurePrestart

确保有work执行

1
2
3
4
5
6
7
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

ScheduledThreadPoolExecutor.DelayedWorkQueue#take

work运行的时候调用queue的take方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];//获取第一个对象
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);//延迟时间
if (delay <= 0)//到时间了
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();//因为没有执行线程初始化,所以等等什么时候有了自己被他人唤醒
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay); //各种condition的awaitNanos
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}

}

ScheduledThreadPoolExecutor.DelayedWorkQueue#finishPoll

1
2
3
4
5
6
7
8
9
 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s]; //重排序队列
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f
}

ScheduledThreadPoolExecutor.ScheduledFutureTask#run

1
2
3
4
5
6
7
8
9
10
11
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//有period的要执行成功设置下次执行时间和增加额外任务
setNextRunTime();
reExecutePeriodic(outerTask);
}
}

异步结果源码分析

怎么拿到的异步任务结果?

FutureTask#awaitDone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { //check线程中断
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) { //判断是否完成
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode(); //生成一个waint对象
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);//链表的对象下一个置成当前的waitNode
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); //等待时间阻塞
}
else
LockSupport.park(this); //一直阻塞
}
}

什么时候回填的结果

FutureTask#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread())) //如果状态不是new 或者 runner状态置不成功直接退出
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();//运行ok 的时候返回result
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)//正常成功set result对象
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}

}

FutureTask#cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) //CAS 置成stateOffset 的中断或者取消
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) { //如果线程运行中,可能中断
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;

}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK