2

Java 线程池系列-实战篇

 2 months ago
source link: https://nicksxs.me/2024/03/17/Java-%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%B3%BB%E5%88%97-%E5%AE%9E%E6%88%98%E7%AF%87/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Java 线程池系列-实战篇

2024-03-17Java 16 11 0 Comments

线程池在实际使用过程中,有时候在理解比较偏理论的时候会出现一些判断错误,这里我们就来看一个实际的案例

private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
for (int j = 0; j < 3; j++) {
threadPoolExecutor.execute(() -> {
int a = 0;
});
}
System.out.println("===============> 详情任务 - 任务处理完成");
}
System.out.println("都执行完成了");
}

MyArrayBlockingQueue 是我复制的 ArrayBlockingQueue 加了点日志,可以认为就是一样的,这种情况下
执行过程是怎么样的呢, 队列长度是 2,核心线程数和最大线程数都是 1,提交任务是采用了两层循环,内层是循环三次,往线程池里提交任务,然后内层循环完了以后会重新睡眠 100 毫秒
在进入下一次外层循环,如果能一眼看出来问题的说明对线程池了解得很深入了,如果没有的话我们就一起来看下
先说下结论,这个代码会出现拒绝异常

考虑下是什么原因呢,是不是我线程数太少了,放大一些,感觉符合直觉一点
修改成

private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(100, 100,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));

然而还是一样

只不过晚了点出现,那么问题出在哪呢
为什么我要去重写这个 MyArrayBlockingQueue,就是为了找到原因,其实很多讲解线程池的都是讲了线程池的参数,什么队列是链表的,数组的
但是没有讲到我是怎么往队列塞任务,怎么从队列取任务的呢

public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

这里是往队列里塞任务,注意这里需要获得锁,
而对于获取任务呢

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

注意这里也需要获得锁,当我一个线程池的线程数进入稳定状态,也就是保持一定数量的线程不变的情况下
上面是一种比较可能的情况,即核心线程数等于最大线程数,那么我在提交任务的时候是非常快的

if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

再来看下这段代码,第一步只需要判断是否为空,第二步就是判断核心线程数量,明显我说的情况,前面两步就直接过去了
然后就是判断线程池运行状态和往队列里塞任务了,但是线程运行完一个任务主动从队列里获取则需要更多的逻辑
这样就造成了我往队列里塞任务会比获取任务快很多,队列一满,就会抛出拒绝异常
即使我把线程数量放大到 100 还是一样,只不过会出现的慢一点,那么口说无凭,我们来验证下,提交任务过快,那么我在提交
方法里做个延迟

public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

这样就没啥问题了

除了最后这个加延时,其他的直接用 ArrayBlockingQueue 就可以实验,实操一下会对这个逻辑有更深的理解


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK