1 一道算法题引发的思考及其实现
1.1 算法题
- 问:如何充分利用多核 CPU 的性能,快速对一个2千万大小的数组进行排序?
- 这道题可以通过归并排序来解决;
1.2 什么是归并排序?
-
归并排序(Merge Sort)是基于分治思想的排序算法,基本思想是把一个大数组分成两个等大的子数组,对每个子数组分别排序,再将排好序的两个子数组合并成一个有序大数组,常通过递归实现,步骤为分(拆分数组为子数组)、治(子数组排序)、合(合并有序子数组)。它的时间复杂度是 (O(n\log n)),空间复杂度是 (O(n))((n) 为数组长度)。
-
分治思想是把规模为 (N) 的问题分解成 (K) 个规模更小、相互独立且与原问题性质相同的子问题,解决子问题后,合并子问题的解得到原问题的解。步骤分为分解(拆分原问题为子问题)、求解(子问题足够小时用简单方法解决)、合并(合并子问题的解得到原问题解)。像归并排序、快速排序、二分查找这些计算机经典算法都基于分治思想。分治任务模型图:
-
动图演示:Comparison Sorting Visualization。
1.3 使用归并排序解决算法题
1.3.1 单线程实现归并排序
-
单线程归并算法的实现,它的基本思路是将序列分成两个部分,分别进行递归排序,然后将排序好的子序列合并起来;
public class MergeSort {private final int[] arrayToSort; // 待排序的原始数组private final int threshold; // 递归拆分的最小阈值,当子数组长度小于此值时将直接使用Arrays.sort()排序/*** 构造函数,初始化待排序数组和阈值* @param arrayToSort 待排序数组* @param threshold 拆分阈值*/public MergeSort(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** 对类内数组进行顺序归并排序的入口方法* @return 排序后的数组*/public int[] sequentialSort() {return sequentialSort(arrayToSort, threshold);}/*** 静态方法:递归实现归并排序* @param arrayToSort 待排序数组* @param threshold 拆分阈值* @return 排序后的数组*/public static int[] sequentialSort(final int[] arrayToSort, int threshold) {// 如果当前数组长度小于阈值,则直接使用JDK内置排序(通常是快速排序优化)if (arrayToSort.length < threshold) {Arrays.sort(arrayToSort);return arrayToSort;}// 计算中点,拆分数组为左右两部分int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint); // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 递归排序左右子数组leftArray = sequentialSort(leftArray, threshold);rightArray = sequentialSort(rightArray, threshold);// 合并两个已排序的子数组return merge(leftArray, rightArray);}/*** 合并两个已排序的数组* @param leftArray 已排序的左数组* @param rightArray 已排序的右数组* @return 合并后的有序数组*/public static int[] merge(final int[] leftArray, final int[] rightArray) {// 创建合并后的数组,长度为两数组之和int[] mergedArray = new int[leftArray.length + rightArray.length];int mergedArrayPos = 0; // 合并数组的当前位置int leftArrayPos = 0; // 左数组的当前指针int rightArrayPos = 0; // 右数组的当前指针// 同时遍历左右数组,按升序选择较小的元素放入合并数组while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;} else {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;}mergedArrayPos++;}// 如果左数组还有剩余元素,全部追加到合并数组while (leftArrayPos < leftArray.length) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;mergedArrayPos++;}// 如果右数组还有剩余元素,全部追加到合并数组while (rightArrayPos < rightArray.length) {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;mergedArrayPos++;}return mergedArray;} }
1.3.2 Fork/Join实现并行归并排序
-
并行归并排序是一种利用多线程实现的归并排序算法。它的基本思路是将数据分成若干部分,然后在不同线程上对这些部分进行归并排序,最后将排好序的部分合并成有序数组。在多核 CPU 上,这种算法也能够有效提高排序速度;
-
可以使用 Java 的 Fork/Join 框架来实现归并排序的并行化:
public class MergeSortTask extends RecursiveAction {private final int threshold; // 任务拆分的最小阈值,当子数组长度小于等于此值时将直接进行排序private int[] arrayToSort; // 当前任务需要排序的数组/*** 构造函数,初始化待排序数组和阈值* @param arrayToSort 待排序数组* @param threshold 拆分阈值*/public MergeSortTask(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** Fork/Join框架的核心方法,实现任务的分解与执行*/@Overrideprotected void compute() {// 如果数组长度小于等于阈值,直接进行排序(基准情况)if (arrayToSort.length <= threshold) {Arrays.sort(arrayToSort); // 使用JDK优化排序算法return;}// 计算中点,将数组拆分为左右两个子数组int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint); // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 创建左右子任务MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);// 将左右任务提交到Fork/Join线程池异步执行(fork操作)leftTask.fork(); // 将左任务提交到工作队列,由其他工作线程执行rightTask.fork(); // 将右任务提交到工作队列,由其他工作线程执行// 等待左右子任务执行完成(join操作)leftTask.join(); // 阻塞当前线程直到左任务完成rightTask.join(); // 阻塞当前线程直到右任务完成// 合并两个已排序的子数组结果arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());}/*** 获取排序后的数组* @return 排序完成的数组*/public int[] getSortedArray() {return arrayToSort;} }
-
在这个示例中,使用 Fork/Join 框架实现了归并排序算法,并通过递归调用实现了并行化;
-
使用 Fork/Join 框架实现归并排序算法的关键在于将排序任务分解成小的任务,使用 Fork/Join 框架将这些小任务提交给线程池中的不同线程并行执行,并在最后将排序后的结果进行合并。这样可以充分利用多核CPU的并行处理能力,提高程序的执行效率。
1.3.3 测试结果对比
-
测试代码:
public class Utils {/*** 生成指定大小的随机整数数组* @param size 数组的大小* @return 包含随机整数的数组*/public static int[] buildRandomIntArray(final int size) {int[] arrayToCalculateSumOf = new int[size];Random generator = new Random();// 生成0-100,000,000范围内的随机整数填充数组for (int i = 0; i < arrayToCalculateSumOf.length; i++) {arrayToCalculateSumOf[i] = generator.nextInt(100000000);}return arrayToCalculateSumOf;} }public class ArrayToSortMain {public static void main(String[] args) {// 生成包含2000万个随机整数的测试数组int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);// 创建数组副本,确保两种排序方法使用相同的数据进行公平比较int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);// 获取当前系统的处理器核心数,用于优化并行计算int processors = Runtime.getRuntime().availableProcessors();// 创建单线程归并排序实例,使用处理器数量作为拆分阈值MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);long startTime = System.nanoTime(); // 记录开始时间(纳秒级精度)// 执行单线程归并排序mergeSort.mergeSort();long duration = System.nanoTime()-startTime; // 计算耗时// 将纳秒转换为毫秒并输出结果System.out.println("单线程归并排序时间: "+(duration/(1000f*1000f))+"毫秒");// 创建Fork/Join并行排序任务,同样使用处理器数量作为阈值MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);// 构建Fork/Join线程池,线程数设置为处理器核心数ForkJoinPool forkJoinPool = new ForkJoinPool(processors);startTime = System.nanoTime(); // 重新记录开始时间// 在线程池中执行并行排序任务(invoke会阻塞直到任务完成)forkJoinPool.invoke(mergeSortTask);duration = System.nanoTime()-startTime; // 计算并行排序耗时// 输出并行排序时间System.out.println("fork/join排序时间: "+(duration/(1000f*1000f))+"毫秒");} }
-
测试结果:数组越大,利用 Fork/Join 框架实现的并行化归并排序比单线程归并排序的效率更高
1.4 并行实现归并排序的优化和注意事项
-
在实际应用里,为充分利用多核CPU,同时保障算法的正确性与效率,得考量数据分布均匀性、内存使用、线程切换开销等因素;
-
具体而言:
-
任务大小:它的选择会影响并行算法的效率与负载均衡。任务太小,任务划分和合并的开销就会过大;任务太大,又难以充分利用多核 CPU 的并行处理能力。所以要结合数据量、CPU 核心数等,选合适的任务大小;
-
负载均衡:并行算法得保证各线程执行的任务大小和时间尽可能相等,不然有的线程会负载过重,有的则过轻。归并排序可通过递归调用来实现负载均衡,但递归层数不能太深,否则会增加线程创建和合并的开销;
-
数据分布:数据分布均匀与否会影响并行算法效率和负载均衡。若数据分布不均,会出现有的线程处理数据量过大,有的过小的情况。实际应用要考虑数据分布,尽量把数据分成大小相等的子数组;
-
内存使用:处理大规模数据时,内存使用情况对算法执行效率至关重要。归并排序可通过原地归并节约内存,但要注意归并的实现方式,避免数据覆盖和不稳定排序等问题;
-
线程切换:线程切换是并行算法的一大开销,要尽量减少切换次数以提升效率。归并排序可通过设置线程池大小和调整任务大小来控制线程数量和切换开销,实现最优性能。
-
2 Fork/Join框架
2.1 简介
-
Fork/Join 是一个支持分治任务模型的并行计算框架,其中:
- Fork:对应分治任务模型里的任务分解,就是把一个大任务拆分成许多小任务;
- Join:对应结果合并,即等小任务并行执行完后,将它们的结果合并成一个大结果;
-
它适用于可采用分治策略的计算密集型任务,像大规模数组排序、图形渲染、复杂算法求解等场景;
-
下图清晰展示了其工作流程:大任务(Big Task)先通过 Fork 不断分解为更小的子任务(SubTask),子任务按顺序计算得到各自结果(Result),之后通过 Join 逐步合并这些结果,最终得到大任务的最终结果(Final Result);
2.2 应用场景
- 并行计算:能便捷执行大规模计算任务,把大任务分解为小任务,借助工作窃取算法并行执行,充分利用多核处理器优势,提升计算效率;
- 递归任务处理:适合递归式的任务分解与执行,可递归地将大任务拆成诸多小任务,再通过工作窃取算法动态分配给工作线程执行;
- 并行流操作:Java 8 引入的 Stream API 用于集合的函数式编程操作,
ForkJoinPool
常用来执行并行流操作里的并行计算部分,比如对元素进行过滤、映射、聚合等; - 高性能任务执行:提供高性能的任务执行机制,通过动态调度任务和管理线程池,有效利用系统资源,在多核处理器上实现任务并行执行。
2.3 使用
- Fork/Join 框架的主要组成部分是 ForkJoinPool、ForkJoinTask;
- ForkJoinPool 是一个线程池,它用于管理 Fork/Join 中任务的执行;
- ForkJoinTask 是一个抽象类,用于表示可以被分割成更小部分的任务。
2.3.1 ForkJoinPool
- ForkJoinPool 是 Fork/Join 框架中的线程池类,它用于管理 Fork/Join 任务的线程;
- ForkJoinPool 类包括一些重要的方法,例如
submit()
、invoke()
、shutdown()
、awaitTermination()
等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果; - ForkJoinPool 类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。
2.3.2 构造器
-
ForkJoinPool
有四个核心参数,用于控制线程池的并行数、工作线程创建、异常处理和模式指定:-
int parallelism
:指定并行级别(parallelism level),ForkJoinPool
会据此决定工作线程数量。若未设置,会用Runtime.getRuntime().availableProcessors()
获取可用处理器数量来设置并行级别; -
ForkJoinWorkerThreadFactory factory
:ForkJoinPool
创建线程时,通过该工厂创建。需实现ForkJoinWorkerThreadFactory
接口,若不指定,由默认的DefaultForkJoinWorkerThreadFactory
负责线程创建; -
UncaughtExceptionHandler handler
:指定异常处理器,当任务运行出错时,由该处理器处理异常; -
boolean asyncMode
:设置队列工作模式。asyncMode
为true
时,使用先进先出队列;为false
时,使用后进先出模式;
-
-
代码示例:
// 获取处理器数量 int processors = Runtime.getRuntime().availableProcessors(); // 构建forkjoin线程池,用获取到的处理器数量作为并行级别参数 ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
2.3.3 任务提交方式
返回值 | 方法 | |
---|---|---|
提交异步执行 | void | execute(ForkJoinTask)<?> task):提交一个ForkJoinTask 类型的任务进行异步执行execute(Runnable task):提交一个 Runnable 类型的任务进行异步执行 |
等待并获取结果 | T | invoke(ForkJoinTask task):提交ForkJoinTask 类型的任务,等待任务执行完毕后获取结果 |
提交执行获取Future结果 | ForkJoinTask | submit(ForkJoinTask task):提交ForkJoinTask 类型的任务,返回对应的 ForkJoinTask submit(Callable task):提交 Callable 类型的任务(Callable 有返回值),返回对应的ForkJoinTask submit(Runnable task):提交 Runnable 类型的任务(Runnable 无返回值),返回对应的ForkJoinTask submit(Runnable task, T result):提交 Runnable 类型的任务,同时指定一个结果,返回对应的ForkJoinTask |
2.3.4 调用方法
-
ForkJoinTask 最核心的是
fork()
方法和join()
方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取; -
fork()
:用于向当前任务所运行的线程池中提交任务。如果当前线程是 ForkJoinWorkerThread 类型,将会放入该线程的工作队列,否则放入 common 线程池的工作队列中;在 Fork/Join 框架中,“该线程的工作队列”指的是当前执行任务的
ForkJoinWorkerThread
所拥有的工作队列:ForkJoinWorkerThread
是ForkJoinPool
中用于执行任务的工作线程。每个ForkJoinWorkerThread
内部都维护了一个双端队列(Deque),这个队列就是“该线程的工作队列”;- 当通过
fork()
方法提交任务时,如果当前线程是ForkJoinWorkerThread
类型,任务会被放入当前ForkJoinWorkerThread
自己的双端队列中,后续该线程会从自己的队列里取出任务执行; - 这样的设计是为了配合“工作窃取(Work - Stealing)”算法:当一个线程的队列中没有任务时,它可以去其他线程的队列中“窃取”任务来执行,从而提高线程的利用率和整体并行效率;
-
join()
:用于获取任务的执行结果。调用join()
时,将阻塞当前线程直到对应的子任务完成运行并返回结果。
2.3.5 处理递归任务
-
比如计算斐波那契数列:
public class Fibonacci extends RecursiveTask<Integer> {final int n; // 要计算斐波那契数列的第n项Fibonacci(int n) {this.n = n;}/*** 重写RecursiveTask的compute()方法,实现斐波那契数列的递归计算* @return 斐波那契数列的第n项值*/protected Integer compute() {// 基准情况:斐波那契数列的前两项直接返回if (n <= 1)return n;// 创建计算F(n-1)的子任务Fibonacci f1 = new Fibonacci(n - 1);// 将子任务提交到ForkJoinPool异步执行(fork操作)f1.fork();// 创建计算F(n-2)的子任务,并在当前线程同步执行Fibonacci f2 = new Fibonacci(n - 2);// 合并结果:F(n) = F(n-1) + F(n-2)return f2.compute() + f1.join(); // f2.compute()同步计算,f1.join()等待异步任务完成}public static void main(String[] args) {// 构建forkjoin线程池(使用默认并行级别,通常是处理器核心数)ForkJoinPool pool = new ForkJoinPool();// 创建计算斐波那契数列第10项的任务Fibonacci task = new Fibonacci(10);// 提交任务到线程池并阻塞等待执行完成,返回最终结果int result = pool.invoke(task);System.out.println(result); // 输出结果:55} }
继承RecursiveTask:表示这是一个有返回值的递归任务,返回Integer类型结果
斐波那契数列定义:F(0)=0,F(1)=1,F(n)=F(n−1)+F(n−2)F(0)=0, F(1)=1, F(n)=F(n-1)+F(n-2)F(0)=0,F(1)=1,F(n)=F(n−1)+F(n−2)
任务分解策略:
- 将F(n)F(n)F(n)分解为F(n−1)F(n-1)F(n−1)和F(n−2)F(n-2)F(n−2)两个子问题
- F(n−1)F(n-1)F(n−1)通过
fork()
异步执行 - F(n−2)F(n-2)F(n−2)通过
compute()
在当前线程同步执行
结果合并:使用
f1.join()
等待异步任务完成,然后与同步计算结果相加Fork/Join工作流程:
fork()
:将任务推入工作队列,供其他工作线程窃取执行join()
:阻塞当前线程直到任务完成并返回结果
性能问题:这种实现方式存在大量重复计算,时间复杂度为O(2n)O(2^n)O(2n),实际应用中应使用动态规划等优化方法
线程池使用:
pool.invoke(task)
同步提交任务并等待结果,适合计算密集型递归任务 -
如果 n 为100000,执行上面的代码会发生什么问题?在上面的例子中,由于递归计算 Fibonacci 数列的任务数量呈指数级增长,当 n 较大时,就容易出现 StackOverflowError 错误。这个错误通常发生在递归过程中,由于递归过程中每次调用函数都会在栈中创建一个新的栈帧,当递归深度过大时,栈空间就会被耗尽,导致 StackOverflowError 错误;
-
那么如何解决栈溢出呢?可以使用迭代的方式计算 Fibonacci 数列,以避免递归过程中占用大量的栈空间。示例代码:
public class Fibonacci {public static void main(String[] args) {int n = 100000; // 要计算斐波那契数列的第100000项// 创建数组用于存储斐波那契数列值,长度为n+1(包含0到n项)long[] fib = new long[n + 1];// 初始化斐波那契数列的前两项fib[0] = 0; // 第0项为0fib[1] = 1; // 第1项为1// 使用动态规划(自底向上)方法计算斐波那契数列for (int i = 2; i <= n; i++) {// 递推公式:F(n) = F(n-1) + F(n-2)fib[i] = fib[i - 1] + fib[i - 2];}// 输出斐波那契数列的第100000项System.out.println(fib[n]);} }
-
对于一些递归深度较大的任务,使用 Fork/Join 框架可能会出现任务调度和内存消耗的问题;
- 当递归深度较大时,会产生大量的子任务,这些子任务可能被调度到不同的线程中执行,而线程的创建和销毁以及任务调度的开销都会占用大量的资源,从而导致性能下降;
- 此外,对于递归深度较大的任务,由于每个子任务所占用的栈空间较大,可能会导致内存消耗过大,从而引起内存溢出的问题;
- 因此,在使用 Fork/Join 框架处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,以避免任务调度和内存消耗的问题。如果递归深度较大,可以尝试采用其他方法来优化算法,如使用迭代方式替代递归,或者限制递归深度来减少任务数量,以避免 Fork/Join 框架的缺点。
2.3.6 处理阻塞任务
-
在 ForkJoinPool 中使用阻塞型任务时需要注意以下几点:
- 防止线程饥饿:
- 当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止;
- 为了避免这种情况,应该避免在 ForkJoinPool 中提交大量的阻塞型任务;
- 使用特定的线程池:
- 为了最大程度地利用 ForkJoinPool 的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被 ForkJoinPool 的窃取机制所影响;
- 例如,可以使用 ThreadPoolExecutor 来创建一个线程池,然后将这个线程池作为 ForkJoinPool 的执行器,这样就可以使用 ThreadPoolExecutor 来处理阻塞型任务,而使用 ForkJoinPool 来处理非阻塞型任务;
- 不要阻塞工作线程:
- 如果在 ForkJoinPool 中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降;
- 为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用 CompletableFuture 等异步编程工具来处理阻塞型任务;
- 防止线程饥饿:
-
下面是一个使用阻塞型任务的例子,这个例子展示了如何使用 CompletableFuture 来处理阻塞型任务:
public class BlockingTaskDemo {public static void main(String[] args) {// 构建一个forkjoin线程池(使用默认配置,通常为核心数)ForkJoinPool pool = new ForkJoinPool();// 创建一个异步任务,使用CompletableFuture并将其提交到ForkJoinPool中执行CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {// 模拟一个耗时的任务(阻塞5秒钟)TimeUnit.SECONDS.sleep(5);return "Hello, world!"; // 任务完成后的返回值} catch (InterruptedException e) {e.printStackTrace();return null; // 发生中断异常时返回null}}, pool); // 指定使用ForkJoinPool作为执行器try {// 阻塞等待任务完成,并获取结果(get()方法会阻塞当前线程直到任务完成)String result = future.get();System.out.println(result); // 输出结果:"Hello, world!"} catch (InterruptedException e) {// 处理线程中断异常e.printStackTrace();} catch (ExecutionException e) {// 处理任务执行过程中抛出的异常e.printStackTrace();} finally {// 关闭ForkJoinPool,释放线程资源(优雅关闭,等待已提交任务完成)pool.shutdown();}} }
- 在这个例子中,使用了 CompletableFuture 来处理阻塞型任务,因为它可以避免阻塞 ForkJoinPool 中的工作线程;
- 另外,我们也可以使用专门的线程池来处理阻塞型任务,例如 ThreadPoolExecutor 等。不管是哪种方式,都需要避免在 ForkJoinPool 中提交大量的阻塞型任务,以免影响整个线程池的性能。
2.4 ForkJoinPool 工作原理
2.4.1 原理概述
-
当通过 ForkJoinPool 的
invoke()
或submit()
方法提交任务时,ForkJoinPool 会依据路由规则将任务提交到某个任务队列。若任务执行过程中创建子任务,子任务会被提交到工作线程对应的任务队列; -
若工作线程对应的任务队列空了,ForkJoinPool 支持任务窃取机制,空闲的工作线程可“窃取”其他工作任务队列里的任务,确保所有工作线程都能有效利用,不闲置;
-
核心设计
-
ForkJoinPool 内部存储有 WorkQueue 数组,提交给它的任务会被分配到指定的 WorkQueue 上执行;
-
每个 WorkQueue 内部维护一个 ForkJoinTask 数组来存储待执行任务,还配有一个独立的 ForkJoinWorkerThread 来真正执行任务;
-
2.4.2 工作线程 ForkJoinWorkerThread
-
ForkJoinWorkerThread 是 ForkJoinPool 里专门执行任务的线程。创建它时,会自动向 ForkJoinPool 注册一个 WorkQueue,这个 WorkQueue 是该线程专属的任务存储队列,且只能出现在
workqueues[]
的奇数位; -
ForkJoinWorkerThread 工作线程启动后,会扫描并 “窃取” 任务执行。另外,当它在
ForkJoinTask#join()
等待返回结果时,若被 ForkJoinPool 线程池发现其任务队列为空,或者当前任务已执行完毕,也会通过工作窃取算法从其他任务队列获取任务,分配到自己的任务队列中执行;
2.4.3 工作队列 WorkQueue
-
WorkQueue 一个是双端队列,用于存储工作线程自己的任务。每个工作线程都维护一个本地 WorkQueue,优先执行本地队列中的任务。当本地队列任务执行完,工作线程会尝试从其他线程的 WorkQueue 中窃取任务;
-
WorkQueue 任务队列分两种类型:一种是外部提交任务所用队列,在任务队列数组中数组下标为偶数;另一种是工作线程私有的任务队列,存储大任务
fork
分解出的任务,在任务队列数组中数组下标为奇数; -
下图展示了 WorkQueue 的操作,如
externalSubmit(task)
外部提交任务、subtask(fork)
子任务提交,还有push
、pop
等操作,以及根据async
是true
还是false
,分别对应FIFO_QUEUE
(先进先出队列)和LIFO_QUEUE
(后进先出队列)的工作模式;
2.4.4 工作窃取机制
-
ForkJoinPool 和 ThreadPoolExecutor 有很大不同,ForkJoinPool 引入了工作窃取设计,这是其性能保障的关键之一;
-
工作窃取指的是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从自己双端队列的头部获取任务;当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部获取任务,这种方式能最大限度减少线程竞争任务的可能性;
-
下图展示了主线程提交任务到 ForkJoinPool 后,任务被推入工作队列(workQueue),
ForkJoinWorkerThread - 1
和ForkJoinWorkerThread - 2
不仅会处理自身队列中的任务(如通过fork
分解任务),当自身任务不足时,还会通过“steal 工作窃取”从其他工作队列获取任务; -
通过工作窃取,Fork/Join 框架能实现任务的自动负载均衡,充分利用多核 CPU 的计算能力,同时避免线程饥饿和延迟问题;
-
如果想了解更多关于 ForkJoinPool 的内容,可以参考Doug Lea 的论文。
2.5 ForkJoinPool 执行流程
-
应用程序外部向Fork/Join框架提交任务(通过
execute
/submit
/invoke
)。此时有两种处理路径:-
路径1:直接提交到workQueues。调用
externalPush(task)
,将任务直接放入工作队列(workQueues)。若提交成功,会触发signalWork(w, q)
,尝试用工作线程执行任务; -
路径2:初始化workQueues并提交任务。调用
externalSubmit(task)
,先初始化工作队列,再将任务提交到队列,后续逻辑与“路径1”类似,最终也会触发signalWork(w, q)
;
-
-
signalWork(w, q)
执行后,会判断是否有空闲线程:-
有空闲线程(Y分支):直接从空闲线程中选一个,执行任务(进入下面的任务执行阶段);
-
无空闲线程(N分支):需要创建新的工作线程,流程如下:
tryAddWorker(c)
:尝试添加新工作线程createWorker()
:创建工作线程实例new ForkJoinWorkerThread(pool)
:实例化ForkJoinWorkerThread
;forkJoinWorkerThread.start()
:启动工作线程,触发线程的run()
方法forkJoinWorkerThread.run()
:线程启动后,执行run()
方法,此时会先执行registerWorker(this)
,将新线程注册到workQueues,完成线程与队列的绑定;
-
-
工作线程就绪后,进入任务执行的核心逻辑:
-
任务入队等待执行。若任务是通过
ForkJoinTask.fork()
提交的,会调用workQueue.push(this)
,将任务放入队列等待执行; -
工作线程执行任务。调用
workQueue.runTask()
,从队列中取出任务执行。执行时调用forkJoinTask.doExec()
,而doExec()
最终会触发任务的核心逻辑:- 若任务是
RecursiveTask
(有返回值的分治任务),会调用RecursiveTask.compute()
(用户自定义的“任务拆分与计算逻辑”就在这里实现); - 执行的入口也可通过
exec()
方法触发,最终导向compute()
;
- 若任务是
-
任务扫描与等待
-
若当前队列“有任务执行”,则持续执行;
-
若“未扫描到任务”,则调用
scan(w, r)
扫描其他队列的任务(Fork/Join的工作窃取(Work-Stealing)机制:空闲线程会从其他线程的队列“偷”任务执行,提升资源利用率); -
若扫描后仍无任务,调用
awaitWork(w, r)
让线程进入“等待任务”状态;
-
-
持续运行工作线程。整个执行过程由
runWorker(workQueue)
驱动,持续从队列中获取并执行任务。
-
2.6总结
-
Fork/Join是基于分治思想(把大任务拆成小任务,并行执行后合并结果)的编程模型,适合计算密集型任务(大量CPU运算的场景),效率提升源于两点:
-
任务切分:将大任务拆成更小的子任务,让更多线程同时参与计算,充分利用多核CPU的并行能力;
-
任务窃取:空闲的线程会“窃取”其他线程队列中未执行的任务,减少线程闲置,同时降低线程间的竞争(因为每个线程优先处理自己队列的任务,窃取是“补充性”的);
-
-
使用
ForkJoinPool
(Fork/Join框架的线程池)时,要关注任务类型:-
最适合的是纯函数计算型任务:任务不依赖外部状态、不修改共享变量(无副作用),这样并行执行时更安全,不会因“状态不一致”出问题;
-
若要处理阻塞型任务(如I/O操作、等待锁),需谨慎评估:虽然
ForkJoinPool
能处理,但会导致线程被长时间占用(阻塞),破坏“任务窃取”的高效性,增加线程管理成本;
-
-
普通线程池(如
ThreadPoolExecutor
)更适合IO密集型(如网络请求、文件读写)或“短小任务”,而Fork/Join针对大规模并行计算,核心区别有四点:-
工作窃取算法:
ForkJoinPool
:线程空闲时,会从其他线程的任务队列中“偷取”任务执行,最大化线程利用率;- 普通线程池:任务存在公共队列,线程直接从公共队列取任务,无“窃取”逻辑,空闲线程只能等新任务入队;
-
任务的分解与合并:
ForkJoinPool
:支持分治——大任务拆成小任务并行执行,最终合并小任务结果得到大任务结果;- 普通线程池:任务是“独立的”,不支持“自动拆分-合并”,需手动处理任务依赖和结果聚合;
-
工作线程的数量:
ForkJoinPool
:会自动根据CPU核心数设置工作线程数量,避免线程过多(减少上下文切换开销),最大化CPU性能;- 普通线程池:需手动指定线程数,若设置不合理(如线程过多导致切换频繁,或过少导致CPU闲置),会影响性能;
-
任务类型适配:
ForkJoinPool
:适合大规模并行计算任务(如大数据排序、矩阵运算);- 普通线程池:适合短小的、IO密集型任务(如Web服务器处理请求、数据库操作)。
-