ThreadPoolTaskExecutor 的使用案例
1. 依赖说明
< dependency> < groupId> org.springframework.retry</ groupId> < artifactId> spring-retry</ artifactId> < version> 1.3.1</ version>
</ dependency>
< dependency> < groupId> org.springframework</ groupId> < artifactId> spring-aspects</ artifactId> < version> 5.3.22</ version>
</ dependency>
< dependency> < groupId> org.slf4j</ groupId> < artifactId> slf4j-api</ artifactId> < version> 1.7.36</ version>
</ dependency>
2. 完整配置代码
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. scheduling. concurrent. ThreadPoolTaskExecutor ;
import org. springframework. util. concurrent. ListenableFuture ;
import org. springframework. util. backoff. FixedBackOff ;
import org. springframework. retry. RetryCallback ;
import org. springframework. retry. RetryContext ;
import org. springframework. retry. support. RetryTemplate ;
import org. springframework. retry. policy. SimpleRetryPolicy ;
import org. springframework. retry. backoff. FixedBackOffPolicy ;
import org. springframework. core. task. TaskDecorator ; import java. util. concurrent. * ;
import java. util. Map ;
import java. util. concurrent. atomic. AtomicInteger ; import org. slf4j. MDC; @Configuration
public class TaskExecutorConfig { @Bean ( name = "taskExecutor" ) public ThreadPoolTaskExecutor taskExecutor ( ) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ( ) ; executor. setCorePoolSize ( 5 ) ; executor. setMaxPoolSize ( 10 ) ; executor. setKeepAliveSeconds ( 60 ) ; BlockingQueue < Runnable > queue = new ArrayBlockingQueue < > ( 25 ) ; executor. setQueue ( queue) ; executor. setRejectedExecutionHandler ( new ThreadPoolExecutor. CallerRunsPolicy ( ) ) ; executor. setThreadNamePrefix ( "TaskExecutor-" ) ; executor. setTaskDecorator ( new MdcTaskDecorator ( ) ) ; executor. initialize ( ) ; return executor; } public static RetryTemplate createRetryTemplate ( ) { RetryTemplate retryTemplate = new RetryTemplate ( ) ; SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy ( ) ; retryPolicy. setMaxAttempts ( 3 ) ; retryTemplate. setRetryPolicy ( retryPolicy) ; FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy ( ) ; backOffPolicy. setBackOffPeriod ( 1000 ) ; retryTemplate. setBackOffPolicy ( backOffPolicy) ; return retryTemplate; } public static class RetryCallableWrapper implements Runnable { private final Runnable targetTask; private final RetryTemplate retryTemplate; public RetryCallableWrapper ( Runnable targetTask, RetryTemplate retryTemplate) { this . targetTask = targetTask; this . retryTemplate = retryTemplate; } @Override public void run ( ) { retryTemplate. execute ( context -> { targetTask. run ( ) ; return null ; } ) ; } } public static class MdcTaskDecorator implements TaskDecorator { @Override public Runnable decorate ( Runnable runnable) { Map < String , String > contextMap = MDC. getCopyOfContextMap ( ) ; return ( ) -> { try { MDC. setContextMap ( contextMap) ; runnable. run ( ) ; } finally { MDC. clear ( ) ; } } ; } } public void useTaskExecutor ( ) { ThreadPoolTaskExecutor executor = taskExecutor ( ) ; Runnable retryableTask = new RetryCallableWrapper ( ( ) -> { System . out. println ( Thread . currentThread ( ) . getName ( ) + " 执行任务" ) ; throw new RuntimeException ( "模拟任务失败" ) ; } , createRetryTemplate ( ) ) ; executor. execute ( retryableTask) ; MDC. put ( "traceId" , "123456" ) ; executor. execute ( ( ) -> { System . out. println ( "当前 traceId: " + MDC. get ( "traceId" ) ) ; System . out. println ( Thread . currentThread ( ) . getName ( ) + " 执行任务" ) ; } ) ; }
}
3. 配置项详解
3.1 线程池核心配置 配置项 说明 推荐值/示例 corePoolSize
核心线程数 CPU密集型:Runtime.getRuntime().availableProcessors()
;IO密集型:2 * CPU核心数
maxPoolSize
最大线程数 根据业务负载调整(如 2 * corePoolSize
) keepAliveSeconds
空闲线程存活时间 与任务平均执行时间匹配(如 60
秒) queue
任务队列 ArrayBlockingQueue(25)
(有界队列)或 LinkedBlockingQueue
(无界队列)rejectedExecutionHandler
拒绝策略 CallerRunsPolicy
(调用线程执行)、AbortPolicy
(抛异常)、DiscardPolicy
(丢弃任务)threadNamePrefix
线程名称前缀 便于日志追踪(如 "TaskExecutor-"
)
3.2 上下文传递机制
MDC(Mapped Diagnostic Context) :SLF4J 提供的日志上下文工具,用于记录日志追踪 ID。TaskDecorator :Spring 提供的装饰器接口,用于在任务执行前后传递上下文。实现方式 :
在主线程中设置 MDC 上下文(如 MDC.put("traceId", "123456")
)。 通过 TaskDecorator
将上下文传递给异步任务。 任务执行时恢复上下文,确保日志可追踪。
3.3 重试机制
RetryTemplate :Spring Retry 提供的重试模板,封装了重试逻辑。SimpleRetryPolicy :定义最大重试次数(如 3 次)。FixedBackOffPolicy :定义重试间隔(如 1 秒)。使用方式 :
将任务包装为 RetryCallableWrapper
。 通过 RetryTemplate
执行任务,自动处理重试逻辑。
4. 使用场景建议
日志追踪 :通过 MDC 传递 traceId
,确保异步任务日志与主线程关联。任务重试 :适用于网络请求、数据库操作等需要自动重试的场景。资源控制 :通过队列和线程数限制,防止系统过载。
5. 注意事项
重试逻辑与异常处理 :
默认对所有异常进行重试,可通过 SimpleRetryPolicy
自定义异常类型。 重试后仍失败的任务需通过日志或监控告警处理。
上下文传递的开销 :
如果上下文较大(如包含复杂对象),需评估性能影响。 使用 InheritableThreadLocal
替代 MDC
可以更灵活地传递上下文。
线程池生命周期 :
确保在应用关闭时正确关闭线程池(调用 executor.shutdown()
)。 Spring 容器会自动关闭线程池,但手动创建时需显式调用。
队列容量与拒绝策略 :
有界队列需合理设置容量,防止任务堆积。 拒绝策略应根据业务需求选择(如 CallerRunsPolicy
适合轻量任务)。
6. 扩展功能(可选)
动态调整线程池参数 :通过 ThreadPoolTaskExecutor
提供的方法(如 setCorePoolSize()
)动态修改配置。监控与调优 :通过 ThreadPoolTaskExecutor
的统计方法(如 getPoolSize()
、getQueueSize()
)监控线程池状态。自定义重试策略 :结合 Spring Retry
实现更复杂的重试逻辑(如指数退避、重试条件判断)。
7. 代码结构图
TaskExecutorConfig
├── taskExecutor() // 配置线程池
│ ├── corePoolSize // 核心线程数
│ ├── maxPoolSize // 最大线程数
│ ├── keepAliveSeconds // 空闲线程存活时间
│ ├── queue // 任务队列
│ ├── rejectedHandler // 拒绝策略
│ └── setTaskDecorator // 上下文传递机制
├── createRetryTemplate() // 创建重试模板
├── RetryCallableWrapper // 重试任务包装器
└── useTaskExecutor() // 使用示例├── execute() // 提交普通任务└── submitListenable()// 提交带返回值任务
8. 总结
上下文传递 :确保异步任务中能访问到主线程的上下文(如日志追踪 ID)。任务重试 :对失败任务自动重试,提升系统鲁棒性。灵活的线程池管理 :根据业务需求调整线程池参数,避免资源浪费或过载。