一、线程池
线程池(ThreadPool)是一种线程复用的机制。它维护着若干个线程,任务来了就复用这些线程去执行,任务做完线程不会销毁,而是回到池中等待下一个任务。
为什么要用线程池?
降低资源消耗:避免频繁创建和销毁线程带来的系统开销。
提高响应速度:任务到来时可以直接复用已有线程,无需等待新线程创建。
便于统一管理:可以统一分配、调优和监控线程,控制最大并发数,防止系统资源耗尽。
支持任务排队和拒绝策略:可以灵活处理任务高峰和异常情况。
二、线程池的核心组成及工作流程
Java线程池的核心实现类是 ThreadPoolExecutor,其主要组成如下:
- 核心线程数(corePoolSize):池中始终存活的线程数,即使它们处于空闲状态也不会被销毁。
- 最大线程数(maximumPoolSize):池中允许的最大线程数。
- 线程空闲时间(keepAliveTime):非核心线程空闲多久会被销毁。
- 时间单位(unit):keepAliveTime的时间单位。
- 任务队列(workQueue):用于保存等待执行任务的队列。
- 线程工厂(threadFactory):用于创建新线程的工厂。
- 拒绝策略(handler):当线程池和队列都满了时,如何处理新任务。
线程池的工作流程
提交任务:调用execute()或submit()方法提交任务。
判断核心线程数:如果当前线程数小于corePoolSize,创建新线程执行任务。
任务入队:如果核心线程已满,尝试将任务放入队列。
创建非核心线程:如果队列也满了,且线程数小于maximumPoolSize,创建非核心线程执行任务。
拒绝策略:如果线程数已达最大且队列也满,执行拒绝策略(如抛异常、丢弃任务等)。
线程复用:线程执行完任务后不会销毁,而是回到池中等待下一个任务。
三、创建线程池的两种方式:
方式一:通过 Executors 工具类(不推荐在生产环境使用)
1. newFixedThreadPool(int nThreads)
- 特点:创建一个固定大小的线程池。
- 核心线程数 = 最大线程数。
- 使用 LinkedBlockingQueue(无界队列),可能会导致任务堆积,有OOM(内存溢出)风险。
- 适用场景:需要控制并发线程数量的场景。
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
2. newSingleThreadExecutor()
- 特点:创建一个只有一个线程的线程池。
- 核心线程数 = 最大线程数 = 1。
- 使用 LinkedBlockingQueue(无界队列),同样有OOM风险。
- 适用场景:需要保证所有任务按顺序执行的场景。
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
3 newCachedThreadPool()
- 特点:创建一个可缓存的线程池,线程数会根据任务量动态调整。
- 核心线程数为0,最大线程数为 Integer.MAX_VALUE。
- 使用 SynchronousQueue,任务来了如果没有空闲线程,就直接创建新线程。
- 风险:如果任务量巨大,会无限制地创建线程,可能导致OOM或系统资源耗尽。
- 适用场景:执行大量短期、异步任务的场景。
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
4 newScheduledThreadPool(int corePoolSize)
- 特点:创建一个支持定时及周期性任务执行的线程池。
- 适用场景:需要执行定时任务或周期性任务的场景。
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
方式二:通过 ThreadPoolExecutor 构造函数(推荐)
ThreadPoolExecutor 是 Java 并发包(java.util.concurrent)中线程池的核心实现类。它功能强大、高度可配置,是理解和使用 Java 线程池的基础。这是最原始、最灵活,也是生产环境中推荐使用的方式。你可以完全控制线程池的所有参数。
下面是一段ThreadPoolExecutor实现的线程池构建:
import java.util.concurrent.*;public class CreateThreadPoolDemo {public static void main(String[] args) {// 定义线程池的核心参数int corePoolSize = 2;int maximumPoolSize = 5;long keepAliveTime = 60L;TimeUnit unit = TimeUnit.SECONDS;BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(10); // 使用有界队列ThreadFactory threadFactory = Executors.defaultThreadFactory();RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略// 创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,threadFactory,handler);// 使用线程池for (int i = 0; i < 15; i++) {threadPoolExecutor.execute(() -> {System.out.println(Thread.currentThread().getName() + " is running...");});}// 关闭线程池threadPoolExecutor.shutdown();}
}
而且我们查看方式一的四种类型的线程池创建:
public static ExecutorService newFixedThreadPool(int nThreads) {// LinkedBlockingQueue 的默认长度为 Integer.MAX_VALUE,可以看作是无界的return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newSingleThreadExecutor() {// LinkedBlockingQueue 的默认长度为 Integer.MAX_VALUE,可以看作是无界的return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}// 同步队列 SynchronousQueue,没有容量,最大线程数是 Integer.MAX_VALUE`
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
可以看到前三种类型都是对ThreadPoolExecutor的包装,可以大胆猜测一下,第四种应该也是对ThreadPoolExecutor的包装;但是在他的实现中没有看到实例化的ThreadPoolExecutor,那就有些疑惑了。那它是如何实现的封装ThreadPoolExecutor呢?看下面的类的关系:
可以看到ScheduledThreadPoolExecutor类中继承了ThreadPoolExecutor,所以它通过 super 关键字调用了父类的构造函数。这说明定时任务线程池本质上也是一个 ThreadPoolExecutor,只是配置了特殊的参数。由此可见ThreadPoolExecutor类十分的重要,它是Executors工具类的基础组成,而且阿里巴巴的《Java开发手册》中强制要求不要使用 Executors 的这几种方法来创建线程池,因为它们都存在资源耗尽的风险;因此ThreadPoolExecutor类进一步解析十分重要;
四、ThreadPoolExecutor源码解析
1 构造方法:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
这是其中的全参构造构造方法,其他的构造方法都是通过this对这个构造方法的调用,类似之前Executors工具类是对ThreadPoolExecutor的封装。它首先对非合理参数进行了判断,如果参数不合理直接抛出对应错误,否则初始化参数构建ThreadPoolExecutor实例;
2 execute方法:
execute方法是Java线程池(如ThreadPoolExecutor)中最核心的任务提交方法,其作用可以总结为:向线程池提交一个需要执行的任务(Runnable),由线程池中的线程来执行该任务。
具体流程如下:
当你调用executor.execute(task)时,线程池会按照如下流程处理:
- 判断当前线程数是否小于核心线程数(corePoolSize)
- 如果是,直接创建新线程来执行任务。
- 如果核心线程已满,尝试将任务放入任务队列(workQueue)
- 如果队列未满,任务会被缓存,等待线程池中的线程来取出并执行。
- 如果队列也满了,且线程数未达到最大线程数(maximumPoolSize)
- 会创建新的非核心线程来执行任务。
- 如果线程池已满且队列也满
- 执行拒绝策略(如抛出异常、丢弃任务等)。
那我们看看上述流程在execute方法中是如何实现的:
public void execute(Runnable command) {// 1. 判空,防止提交null任务if (command == null)throw new NullPointerException();// 2. 获取线程池当前状态和线程数int c = ctl.get();// 3. 如果当前线程数小于核心线程数,优先创建核心线程执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 尝试创建核心线程return; // 成功则直接返回c = ctl.get(); // 失败则重新获取ctl,继续后续流程}// 4. 如果线程池处于RUNNING状态且队列未满,任务入队if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get(); // 入队后再次获取ctl,防止状态变化// 4.1 如果线程池已关闭且任务还在队列,移除任务并执行拒绝策略if (!isRunning(recheck) && remove(command))reject(command);// 4.2 如果线程池里没有线程了,创建一个非核心线程来保证队列任务能被执行else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 5. 如果队列也满了,尝试创建非核心线程执行任务else if (!addWorker(command, false))// 6. 如果创建失败(线程池已满或已关闭),执行拒绝策略reject(command);
}
3 状态控制
如上,你可能会疑惑ctl是啥 : 它是一种原子整型成员变量,主要用来状态控制:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
先看其参数RUNNING,它是-1向左位移29位
在 Java 中,int 类型是 32 位,-1 的补码是:
1111 1111 1111 1111 1111 1111 1111 1111
左移 29 位后,低 29 位变成 0,高 3 位还是 1:
1110 0000 0000 0000 0000 0000 0000 0000
在看看ctlOf方法:
private static int ctlOf(int rs, int wc) { return rs | wc;
}
它是实现了一个按位或运算,为啥要这样设计呢?这是因为可以用一个int变量(ctl)同时存储线程池的状态和线程数,高三位存储状态,低29位存储线程数,这样可以通过一个int变量同时管理线程池状态和线程数;
那如何获取状态信息和线程数呢?
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; }
如上两种方法,runStateOf是获取状态信息,wokerCountOf是获取线程数信息:
CAPACITY是1左移29位-1得到的,其二进制信息为:0001 1111 1111 1111 1111 1111 1111 1111
其中高三位全为0,低29为全为1;可以对CAPACITY适时取反后进行按位与操作获取高三位或者低29位信息,这样就可以单独获取到该线程池的状态和线程数量信息;
更多详细的信息如下表:
表达式 | 二进制运算示例 | 作用 |
---|---|---|
c | 1110 0000 ... 0000 0101 | 存储复合信息(状态+线程数) |
CAPACITY | 0001 1111 ... 1111 1111 | 线程数掩码(高3位为0,低29位为1) |
c & CAPACITY | 0000 0000 ... 0000 0101 | 提取线程数(workerCountOf(c)的实现) |
~CAPACITY | 1110 0000 ... 0000 0000 | 运行状态掩码(高3位为1,低29位为0) |
c & ~CAPACITY | 1110 0000 ... 0000 0000 | 提取运行状态(runStateOf(c)的实现) |
4 addWorker方法
好了,了解了状态信息和线程信息获取过程,还需要看一下addWorker方法,这个是用来创建线程的方法。它的两个参数,一个是Runnable变量,一个是布尔类型变量;
/*** 尝试创建一个新的Worker线程,并执行其第一个任务。* @param firstTask Worker线程的第一个任务,可以为null。* @param core 如果为true,则使用corePoolSize作为线程数上限;否则使用maximumPoolSize。* @return 如果成功创建并启动了Worker,则返回true;否则返回false。*/
private boolean addWorker(Runnable firstTask, boolean core) {// 外层循环,用于在CAS失败或线程池状态改变时重试。retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// --- 状态检查 ---// 检查是否可以添加新的Worker线程。// 如果线程池已关闭 (>= SHUTDOWN),则通常不允许添加新线程。// 但有一个例外:如果状态是SHUTDOWN,且任务队列不为空,允许添加一个没有初始任务的Worker来处理队列中的任务。if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 内层循环,用于通过CAS原子地增加工作线程数。for (;;) {int wc = workerCountOf(c);// --- 容量检查 ---// 检查工作线程数是否已达到上限 (CAPACITY或core/maximumPoolSize)。if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// --- CAS操作 ---// 尝试原子地将工作线程数+1。if (compareAndIncrementWorkerCount(c))break retry; // CAS成功,跳出所有循环,继续执行后续的创建逻辑。// --- CAS失败处理 ---c = ctl.get(); // CAS失败,重新读取ctl的值。if (runStateOf(c) != rs)continue retry; // 如果线程池状态已改变,回到外层循环重试。// 如果状态未变,说明是其他线程也增加了线程数导致的CAS失败,仅重试内层循环。}}// --- 创建并启动Worker线程 ---boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的Worker对象,它包装了任务和要执行的线程。w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 获取全局锁,保证线程安全地添加Worker和启动线程。try {// 在持有锁的情况下,再次检查线程池状态,防止在获取锁的过程中线程池被关闭。int rs = runStateOf(ctl.get());// 如果线程池正在运行,或者处于SHUTDOWN状态且允许添加空任务的Worker。if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 预检,防止启动一个已经存活的线程。throw new IllegalThreadStateException();// 将新的Worker添加到workers集合中。workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock(); // 确保锁被释放。}if (workerAdded) {t.start(); // 启动线程。workerStarted = true;}}} finally {// --- 失败回滚 ---// 如果线程启动失败(如ThreadFactory创建失败或启动过程中出错)。if (!workerStarted) {addWorkerFailed(w); // 调用失败处理方法,将之前增加的线程数-1,并从集合中移除Worker。}}return workerStarted;
}
先看for循环中的第一个判断
if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;
rs先判断线程池是否是正常状态并且任务和队列不能为null,为啥判断队列不为null呢,因为,当核心线程数没到最大值时,再来的任务会创建核心线程,当核心线程达到最大值时会向队列中暂存任务,因此队列不为null时,核心线程数达到了最大值,不会创建核心线程数;
在状态检查之后,进行线程容量检查,如果小于核心线程数则进行增加线程数,在增加过程中使用的是cas操作,如果不成功则重新获取rs判断状态;如果线程池状态已改变,回到外层循环重试。如果状态未变,说明是其他线程也增加了线程数导致的CAS失败,仅重试内层循环。
当线程核心数增加成功后,开始增加worker线程并且启动线程,在添加工作线程时使用ReentrantLock 对workers对象上锁;在此过程中会再次检查线程池状态,在添加成功后同时更新largestPoolSize参数,这个参数记录了线程池中最大的线程数量。别把核心线程数:corePoolSize和最大线程数:maximumPoolSize混淆了。
当添加线程成功则运行线程
如果线程启动失败则调用失败处理方法,将之前增加的线程数-1,并从集合中移除Worker。
先写这些吧......