3

InheritableThreadLocal 在线程池中进行父子线程间消息传递出现消息丢失的解析 - appl...

 1 year ago
source link: https://www.cnblogs.com/hujunhui530/p/16423851.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

InheritableThreadLocal 在线程池中进行父子线程间消息传递出现消息丢失的解析

在日常研发过程中,我们经常面临着需要在线程内,线程间进行消息传递,比如在修改一些开源组件源码的过程中,需要将外部参数透传到内部,如果进行方法参数重载,则涉及到的改动量过大,这样,我们可以依赖ThreadLocal 来进行消息传递。

ThreadLocal 是 存储在线程栈帧中的一块数据存储区域,其可以做到线程与线程之间的读写隔离。

但是在我们的日常场景中,经常会出现 父线程 需要向子线程中传递消息,而 ThreadLocal  仅能在当前线程上进行数据缓存,因此 我们需要使用 InheritableThreadLocal  来实现 父子线程间的消息传递

// 定义消息<br>public class ThreadLocalMessage {
private final InheritableThreadLocal<Msg> msg;
private ThreadLocalMessage() {
msg = new InheritableThreadLocal<>();
}
public Msg getMsg() {
return this.msg.get();
}
public void setMsg(Msg msg) {
this.msg.set(msg);
}
public void clear() {
msg.remove();
}
private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
public static ThreadLocalMessage getInstance() {
return threadLocalMessage;
}
/**
* 获取线程中的消息
*
* @return
*/
public static Msg getOrCreateMsg() {
Msg msg = ThreadLocalMessage.getInstance().getMsg();
if (msg == null) {
msg = new Msg();
}
return msg;
}
public static class Msg {
/**
* taskId
*/
private String taskId;
private Map<String, Object> others;
private int retCode;
public Msg() {
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public String toString() {
return "Msg{" +
"taskId='" + taskId + '\'' +
", others=" + others +
", retCode=" + retCode +
'}';
}
}
}
// 定义线程池<br>@EnableAsync
@Configuration
public class ExecutorConfig {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${executor.corePool:2}")
private Integer corePool;
@Value("${executor.maxPool:10}")
private Integer maxPool;
@Value("${executor.queue:2}")
private Integer queue;
@Bean("cdl-executor")
public Executor executor() {
log.info("start async Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePool);
//配置最大线程数
executor.setMaxPoolSize(maxPool);
//配置队列大小
executor.setQueueCapacity(queue);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-executor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler((r, e) -> {
// .....
});
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;<br>        // 使用TTL 初始化 executor
//return TtlExecutors.getTtlExecutor(executor);
}
}
// 创建子线程进行消息传递并打印<br>public String test() throws Exception{
for (int i = 0 ; i < 20; i++){
ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg();
msg.setTaskId("task_id_"+i);
ThreadLocalMessage.getInstance().setMsg(msg);
myService.testThread(i);
ThreadLocalMessage.getInstance().clear();
}
return "ok";
}

经过代码测试,我们创建了一个池子大小为10 的线程,并发启动了20个线程去进行父子线程消息传递,结果如下:

2888789-20220629150811790-1500648858.png

经过测试,我们发现 只有10个线程 的消息传递成功了,其余10个线程的消息均丢失了,这是什么原因呢。。。

遇到这个问题,我们首先得弄清楚 InheritableThreadLocal 是如何在父子线程间进行消息传递的

InheritableThreadLocal 在父线程创建子线程的时候,会将父线程中InheritableThreadLocal  中存储的数据 拷贝一份 存储到子线程的 InheritableThreadLocal  中

而我们使用的 线程池,线程池是会反复利用线程的,当线程池没有被创建满,每次都是新创建线程,直到线程池创建满了,再需要使用线程就会从线程池中拿已经创建好的线程。

问题就出在这里,由于后面的线程 是从线程池中去捞已经创建好的线程,不会走创建逻辑,也就无法触发 InheritableThreadLocal 中向子线程 拷贝,这也就是为什么  InheritableThreadLocal  合并线程池 使用时,出现了 消息丢失的原因

如何解决????

阿里巴巴开源的TTL ,用于解决线程池中的父子线程复用,线程数据传递,可以完美解决这个问题

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.0.0</version>
</dependency>
@EnableAsync
@Configuration
public class ExecutorConfig {
private final Logger log = LoggerFactory.getLogger(getClass());
@Value("${executor.corePool:2}")
private Integer corePool;
@Value("${executor.maxPool:10}")
private Integer maxPool;
@Value("${executor.queue:2}")
private Integer queue;
@Bean("cdl-executor")
public Executor executor() {
log.info("start async Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(corePool);
//配置最大线程数
executor.setMaxPoolSize(maxPool);
//配置队列大小
executor.setQueueCapacity(queue);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-executor-");
// 设置拒绝策略
executor.setRejectedExecutionHandler((r, e) -> {
// .....
});
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
// 使用TTL 的 executor
return TtlExecutors.getTtlExecutor(executor);
//return executor;
}
}
public class ThreadLocalMessage {
private final TransmittableThreadLocal<Msg> msg;
private ThreadLocalMessage() {
msg = new TransmittableThreadLocal<>();
}
public Msg getMsg() {
return this.msg.get();
}
public void setMsg(Msg msg) {
this.msg.set(msg);
}
public void clear() {
msg.remove();
}
private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage();
public static ThreadLocalMessage getInstance() {
return threadLocalMessage;
}
/**
* 获取线程中的消息
*
* @return
*/
public static Msg getOrCreateMsg() {
Msg msg = ThreadLocalMessage.getInstance().getMsg();
if (msg == null) {
msg = new Msg();
}
return msg;
}
public static class Msg {
/**
* taskId
*/
private String taskId;
public Msg() {
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
@Override
public String toString() {
return "Msg{" +
"taskId='" + taskId + '\'' +
'}';
}
}
}

按照之前的调用方法再试一次,结果如下:

2888789-20220629160635627-1203133896.png

 可以发现未出现数据丢失的情况


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK