

java | 自定义阻塞队列 - 超时阻塞
source link: https://benpaodewoniu.github.io/2023/01/01/java154/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

java | 线程池 自定义阻塞队列 - 基本阻塞队列 完成了基本的阻塞队列,但是,其还有不足。
这一章节,将加上超时逻辑。
超时逻辑,分别是
主要针对的是阻塞队列。
超时阻塞队列
// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;
public BlockQuene(int capcity) {
this.capcity = capcity;
}
// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
} finally {
lock.unlock();
}
}
// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回的是剩余时间
if (nanos <= 0) {
return null;
}
nanos = noFullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
// 带超时时间的阻塞添加
// 防止队列满了,一直死等
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
if (nanos <= 0) {
log.debug("任务超时 {}", task);
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
里面主要增加了逻辑
poll(long timeout, TimeUnit unit)
offer(T task, long timeout, TimeUnit unit)
日志输出如下
14:40:12.691 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:12.702 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:13.208 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.709 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:14.213 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.719 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:15.221 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:22.702 [Thread-0] DEBUG c.Test - 0
14:40:22.702 [Thread-1] DEBUG c.Test - 1
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.Test - 2
14:40:32.706 [Thread-0] DEBUG c.Test - 3
14:40:32.706 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c
可以发现很明显的发现就执行了 4
个任务。
虽然增加了超时逻辑,但是也丧失了灵活性。
比如,有的任务比较不重要,我希望超时放弃,有的任务很重要,我希望一直等待,显然,这套框架不满足灵活性。
Recommend
-
144
导入 在文章Java阻塞队列详解中分析了java中提供的一些阻塞队列,阻塞队列在并发编程中是一种非常重要的高级数据结构,无论是在实际项目中还是在jdk源码的其他组件实现上都有高频的使用,比如java中的线程池实现就使用了多种阻塞队列。阻塞队列作为一种数据容器,...
-
80
JUC简介 在 Java 5.0 提供了java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步IO和轻量级任务框架;还提供了设计用于多线程上下文中的Collection实...
-
27
一.说说Java创建多线程的方法 1. 通过继承Thread类实现run方法 2. 通过实现Runnable接口 3. 通过实现Callable接口 4. 通过线程池获取 二. 可以写一个Callable的案例吗?如何调用Callable接口...
-
24
Java阻塞队列四组API介绍 通过前面几篇文章的学习,我们已经知道了Java中的队列分为阻塞队列和非阻塞队列以及常用的七个阻塞队列。如下图:
-
9
几种主要的阻塞队列 自从Java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个: ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小。并且可以指定公平性与非...
-
26
...
-
5
转载自并发编程网 – ifeve.com本文链接地址: 聊聊并发(七)——Java中的阻塞队列* 1. 什么是阻塞队列?阻塞队列(BlockingQueue)是一个支持两个附加操作...
-
9
Java并发编程学习7-阻塞队列 推荐 原创 Huazie 2022-10-18 15:21:21...
-
4
线程池 自定义阻塞队列 犀牛的博客 姑苏城外一茅屋...
-
6
java | 自定义阻塞队列 - 拒绝策略阻塞队列 java | 自定义阻塞队...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK