InheritableThreadLocal在线程池中父子线程间消息传递时,为何会出现消息丢失的现象?

2026-04-18 00:021阅读0评论SEO资讯
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1402个文字,预计阅读时间需要6分钟。

InheritableThreadLocal在线程池中父子线程间消息传递时,为何会出现消息丢失的现象?

在日常研发过程中,我们常面临在线程内部,线程间进行消息传递的需求。例如,在修改一些开源组件源代码的过程中,需要将外部参数传递至内部,若涉及方法参数的重新加载,还需涉及及的改动。

在日常研发过程中,我们经常面临着需要在线程内,线程间进行消息传递,比如在修改一些开源组件源码的过程中,需要将外部参数透传到内部,如果进行方法参数重载,则涉及到的改动量过大,这样,我们可以依赖ThreadLocal 来进行消息传递。

ThreadLocal 是 存储在线程栈帧中的一块数据存储区域,其可以做到线程与线程之间的读写隔离。

但是在我们的日常场景中,经常会出现 父线程 需要向子线程中传递消息,而ThreadLocal 仅能在当前线程上进行数据缓存,因此 我们需要使用InheritableThreadLocal 来实现 父子线程间的消息传递

// 定义消息
public class ThreadLocalMessage { private final InheritableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new InheritableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 获取线程中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; private Map<String, Object> others; private int retCode; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + ", others=" + others + ", retCode=" + retCode + '}'; } } }

  

// 定义线程池
@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePool); //配置最大线程数 executor.setMaxPoolSize(maxPool); //配置队列大小 executor.setQueueCapacity(queue); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-executor-"); // 设置拒绝策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor;
// 使用TTL 初始化 executor //return TtlExecutors.getTtlExecutor(executor); } }

// 创建子线程进行消息传递并打印
public String test() throws Exception{ for (int i = 0 ; i < 20; i++){ ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg(); msg.setTaskId("task_id_"+i); ThreadLocalMessage.getInstance().setMsg(msg); myService.testThread(i); ThreadLocalMessage.getInstance().clear(); } return "ok"; }

  

经过代码测试,我们创建了一个池子大小为10 的线程,并发启动了20个线程去进行父子线程消息传递,结果如下:

经过测试,我们发现 只有10个线程 的消息传递成功了,其余10个线程的消息均丢失了,这是什么原因呢。。。

遇到这个问题,我们首先得弄清楚 InheritableThreadLocal 是如何在父子线程间进行消息传递的

InheritableThreadLocal 在父线程创建子线程的时候,会将父线程中InheritableThreadLocal 中存储的数据 拷贝一份 存储到子线程的InheritableThreadLocal 中

而我们使用的 线程池,线程池是会反复利用线程的,当线程池没有被创建满,每次都是新创建线程,直到线程池创建满了,再需要使用线程就会从线程池中拿已经创建好的线程。

问题就出在这里,由于后面的线程 是从线程池中去捞已经创建好的线程,不会走创建逻辑,也就无法触发InheritableThreadLocal 中向子线程 拷贝,这也就是为什么InheritableThreadLocal 合并线程池 使用时,出现了 消息丢失的原因

如何解决????

阿里巴巴开源的TTL ,用于解决线程池中的父子线程复用,线程数据传递,可以完美解决这个问题

<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.0.0</version> </dependency>

  

@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePool); //配置最大线程数 executor.setMaxPoolSize(maxPool); //配置队列大小 executor.setQueueCapacity(queue); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-executor-"); // 设置拒绝策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); // 使用TTL 的 executor return TtlExecutors.getTtlExecutor(executor); //return executor; } }

  

public class ThreadLocalMessage { private final TransmittableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new TransmittableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 获取线程中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + '}'; } } }

  

按照之前的调用方法再试一次,结果如下:

InheritableThreadLocal在线程池中父子线程间消息传递时,为何会出现消息丢失的现象?

可以发现未出现数据丢失的情况

本文共计1402个文字,预计阅读时间需要6分钟。

InheritableThreadLocal在线程池中父子线程间消息传递时,为何会出现消息丢失的现象?

在日常研发过程中,我们常面临在线程内部,线程间进行消息传递的需求。例如,在修改一些开源组件源代码的过程中,需要将外部参数传递至内部,若涉及方法参数的重新加载,还需涉及及的改动。

在日常研发过程中,我们经常面临着需要在线程内,线程间进行消息传递,比如在修改一些开源组件源码的过程中,需要将外部参数透传到内部,如果进行方法参数重载,则涉及到的改动量过大,这样,我们可以依赖ThreadLocal 来进行消息传递。

ThreadLocal 是 存储在线程栈帧中的一块数据存储区域,其可以做到线程与线程之间的读写隔离。

但是在我们的日常场景中,经常会出现 父线程 需要向子线程中传递消息,而ThreadLocal 仅能在当前线程上进行数据缓存,因此 我们需要使用InheritableThreadLocal 来实现 父子线程间的消息传递

// 定义消息
public class ThreadLocalMessage { private final InheritableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new InheritableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 获取线程中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; private Map<String, Object> others; private int retCode; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + ", others=" + others + ", retCode=" + retCode + '}'; } } }

  

// 定义线程池
@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePool); //配置最大线程数 executor.setMaxPoolSize(maxPool); //配置队列大小 executor.setQueueCapacity(queue); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-executor-"); // 设置拒绝策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); return executor;
// 使用TTL 初始化 executor //return TtlExecutors.getTtlExecutor(executor); } }

// 创建子线程进行消息传递并打印
public String test() throws Exception{ for (int i = 0 ; i < 20; i++){ ThreadLocalMessage.Msg msg = ThreadLocalMessage.getOrCreateMsg(); msg.setTaskId("task_id_"+i); ThreadLocalMessage.getInstance().setMsg(msg); myService.testThread(i); ThreadLocalMessage.getInstance().clear(); } return "ok"; }

  

经过代码测试,我们创建了一个池子大小为10 的线程,并发启动了20个线程去进行父子线程消息传递,结果如下:

经过测试,我们发现 只有10个线程 的消息传递成功了,其余10个线程的消息均丢失了,这是什么原因呢。。。

遇到这个问题,我们首先得弄清楚 InheritableThreadLocal 是如何在父子线程间进行消息传递的

InheritableThreadLocal 在父线程创建子线程的时候,会将父线程中InheritableThreadLocal 中存储的数据 拷贝一份 存储到子线程的InheritableThreadLocal 中

而我们使用的 线程池,线程池是会反复利用线程的,当线程池没有被创建满,每次都是新创建线程,直到线程池创建满了,再需要使用线程就会从线程池中拿已经创建好的线程。

问题就出在这里,由于后面的线程 是从线程池中去捞已经创建好的线程,不会走创建逻辑,也就无法触发InheritableThreadLocal 中向子线程 拷贝,这也就是为什么InheritableThreadLocal 合并线程池 使用时,出现了 消息丢失的原因

如何解决????

阿里巴巴开源的TTL ,用于解决线程池中的父子线程复用,线程数据传递,可以完美解决这个问题

<dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-thread-local</artifactId> <version>2.0.0</version> </dependency>

  

@EnableAsync @Configuration public class ExecutorConfig { private final Logger log = LoggerFactory.getLogger(getClass()); @Value("${executor.corePool:2}") private Integer corePool; @Value("${executor.maxPool:10}") private Integer maxPool; @Value("${executor.queue:2}") private Integer queue; @Bean("cdl-executor") public Executor executor() { log.info("start async Executor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心线程数 executor.setCorePoolSize(corePool); //配置最大线程数 executor.setMaxPoolSize(maxPool); //配置队列大小 executor.setQueueCapacity(queue); //配置线程池中的线程的名称前缀 executor.setThreadNamePrefix("async-executor-"); // 设置拒绝策略 executor.setRejectedExecutionHandler((r, e) -> { // ..... }); // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //执行初始化 executor.initialize(); // 使用TTL 的 executor return TtlExecutors.getTtlExecutor(executor); //return executor; } }

  

public class ThreadLocalMessage { private final TransmittableThreadLocal<Msg> msg; private ThreadLocalMessage() { msg = new TransmittableThreadLocal<>(); } public Msg getMsg() { return this.msg.get(); } public void setMsg(Msg msg) { this.msg.set(msg); } public void clear() { msg.remove(); } private static final ThreadLocalMessage threadLocalMessage = new ThreadLocalMessage(); public static ThreadLocalMessage getInstance() { return threadLocalMessage; } /** * 获取线程中的消息 * * @return */ public static Msg getOrCreateMsg() { Msg msg = ThreadLocalMessage.getInstance().getMsg(); if (msg == null) { msg = new Msg(); } return msg; } public static class Msg { /** * taskId */ private String taskId; public Msg() { } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } @Override public String toString() { return "Msg{" + "taskId='" + taskId + '\'' + '}'; } } }

  

按照之前的调用方法再试一次,结果如下:

InheritableThreadLocal在线程池中父子线程间消息传递时,为何会出现消息丢失的现象?

可以发现未出现数据丢失的情况