1 一道算法题引发的思考及其实现

1.1 算法题

  • 问:如何充分利用多核 CPU 的性能,快速对一个2千万大小的数组进行排序?
    • 这道题可以通过归并排序来解决;

1.2 什么是归并排序?

  • 归并排序(Merge Sort)是基于分治思想的排序算法,基本思想是把一个大数组分成两个等大的子数组,对每个子数组分别排序,再将排好序的两个子数组合并成一个有序大数组,常通过递归实现,步骤为分(拆分数组为子数组)、治(子数组排序)、合(合并有序子数组)。它的时间复杂度是 (O(n\log n)),空间复杂度是 (O(n))((n) 为数组长度)。

  • 分治思想是把规模为 (N) 的问题分解成 (K) 个规模更小、相互独立且与原问题性质相同的子问题,解决子问题后,合并子问题的解得到原问题的解。步骤分为分解(拆分原问题为子问题)、求解(子问题足够小时用简单方法解决)、合并(合并子问题的解得到原问题解)。像归并排序、快速排序、二分查找这些计算机经典算法都基于分治思想。分治任务模型图:

    在这里插入图片描述

  • 动图演示:Comparison Sorting Visualization。

1.3 使用归并排序解决算法题

1.3.1 单线程实现归并排序

  • 单线程归并算法的实现,它的基本思路是将序列分成两个部分,分别进行递归排序,然后将排序好的子序列合并起来;

    public class MergeSort {private final int[] arrayToSort;  // 待排序的原始数组private final int threshold;      // 递归拆分的最小阈值,当子数组长度小于此值时将直接使用Arrays.sort()排序/*** 构造函数,初始化待排序数组和阈值* @param arrayToSort 待排序数组* @param threshold 拆分阈值*/public MergeSort(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** 对类内数组进行顺序归并排序的入口方法* @return 排序后的数组*/public int[] sequentialSort() {return sequentialSort(arrayToSort, threshold);}/*** 静态方法:递归实现归并排序* @param arrayToSort 待排序数组* @param threshold 拆分阈值* @return 排序后的数组*/public static int[] sequentialSort(final int[] arrayToSort, int threshold) {// 如果当前数组长度小于阈值,则直接使用JDK内置排序(通常是快速排序优化)if (arrayToSort.length < threshold) {Arrays.sort(arrayToSort);return arrayToSort;}// 计算中点,拆分数组为左右两部分int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);   // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 递归排序左右子数组leftArray = sequentialSort(leftArray, threshold);rightArray = sequentialSort(rightArray, threshold);// 合并两个已排序的子数组return merge(leftArray, rightArray);}/*** 合并两个已排序的数组* @param leftArray 已排序的左数组* @param rightArray 已排序的右数组* @return 合并后的有序数组*/public static int[] merge(final int[] leftArray, final int[] rightArray) {// 创建合并后的数组,长度为两数组之和int[] mergedArray = new int[leftArray.length + rightArray.length];int mergedArrayPos = 0;  // 合并数组的当前位置int leftArrayPos = 0;    // 左数组的当前指针int rightArrayPos = 0;   // 右数组的当前指针// 同时遍历左右数组,按升序选择较小的元素放入合并数组while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) {if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;} else {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;}mergedArrayPos++;}// 如果左数组还有剩余元素,全部追加到合并数组while (leftArrayPos < leftArray.length) {mergedArray[mergedArrayPos] = leftArray[leftArrayPos];leftArrayPos++;mergedArrayPos++;}// 如果右数组还有剩余元素,全部追加到合并数组while (rightArrayPos < rightArray.length) {mergedArray[mergedArrayPos] = rightArray[rightArrayPos];rightArrayPos++;mergedArrayPos++;}return mergedArray;}
    }  
    

1.3.2 Fork/Join实现并行归并排序

  • 并行归并排序是一种利用多线程实现的归并排序算法。它的基本思路是将数据分成若干部分,然后在不同线程上对这些部分进行归并排序,最后将排好序的部分合并成有序数组。在多核 CPU 上,这种算法也能够有效提高排序速度;

  • 可以使用 Java 的 Fork/Join 框架来实现归并排序的并行化:

    public class MergeSortTask extends RecursiveAction {private final int threshold; // 任务拆分的最小阈值,当子数组长度小于等于此值时将直接进行排序private int[] arrayToSort;   // 当前任务需要排序的数组/*** 构造函数,初始化待排序数组和阈值* @param arrayToSort 待排序数组* @param threshold 拆分阈值*/public MergeSortTask(final int[] arrayToSort, final int threshold) {this.arrayToSort = arrayToSort;this.threshold = threshold;}/*** Fork/Join框架的核心方法,实现任务的分解与执行*/@Overrideprotected void compute() {// 如果数组长度小于等于阈值,直接进行排序(基准情况)if (arrayToSort.length <= threshold) {Arrays.sort(arrayToSort); // 使用JDK优化排序算法return;}// 计算中点,将数组拆分为左右两个子数组int midpoint = arrayToSort.length / 2;int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);    // 左半部分 [0, midpoint)int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length); // 右半部分 [midpoint, end)// 创建左右子任务MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);// 将左右任务提交到Fork/Join线程池异步执行(fork操作)leftTask.fork();  // 将左任务提交到工作队列,由其他工作线程执行rightTask.fork(); // 将右任务提交到工作队列,由其他工作线程执行// 等待左右子任务执行完成(join操作)leftTask.join();  // 阻塞当前线程直到左任务完成rightTask.join(); // 阻塞当前线程直到右任务完成// 合并两个已排序的子数组结果arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());}/*** 获取排序后的数组* @return 排序完成的数组*/public int[] getSortedArray() {return arrayToSort;}
    }
    
  • 在这个示例中,使用 Fork/Join 框架实现了归并排序算法,并通过递归调用实现了并行化;

  • 使用 Fork/Join 框架实现归并排序算法的关键在于将排序任务分解成小的任务,使用 Fork/Join 框架将这些小任务提交给线程池中的不同线程并行执行,并在最后将排序后的结果进行合并。这样可以充分利用多核CPU的并行处理能力,提高程序的执行效率。

1.3.3 测试结果对比

  • 测试代码:

    public class Utils {/*** 生成指定大小的随机整数数组* @param size 数组的大小* @return 包含随机整数的数组*/public static int[] buildRandomIntArray(final int size) {int[] arrayToCalculateSumOf = new int[size];Random generator = new Random();// 生成0-100,000,000范围内的随机整数填充数组for (int i = 0; i < arrayToCalculateSumOf.length; i++) {arrayToCalculateSumOf[i] = generator.nextInt(100000000);}return arrayToCalculateSumOf;}
    }public class ArrayToSortMain {public static void main(String[] args) {// 生成包含2000万个随机整数的测试数组int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);// 创建数组副本,确保两种排序方法使用相同的数据进行公平比较int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);// 获取当前系统的处理器核心数,用于优化并行计算int processors = Runtime.getRuntime().availableProcessors();// 创建单线程归并排序实例,使用处理器数量作为拆分阈值MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);long startTime = System.nanoTime(); // 记录开始时间(纳秒级精度)// 执行单线程归并排序mergeSort.mergeSort();long duration = System.nanoTime()-startTime; // 计算耗时// 将纳秒转换为毫秒并输出结果System.out.println("单线程归并排序时间: "+(duration/(1000f*1000f))+"毫秒");// 创建Fork/Join并行排序任务,同样使用处理器数量作为阈值MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);// 构建Fork/Join线程池,线程数设置为处理器核心数ForkJoinPool forkJoinPool = new ForkJoinPool(processors);startTime = System.nanoTime(); // 重新记录开始时间// 在线程池中执行并行排序任务(invoke会阻塞直到任务完成)forkJoinPool.invoke(mergeSortTask);duration = System.nanoTime()-startTime; // 计算并行排序耗时// 输出并行排序时间System.out.println("fork/join排序时间: "+(duration/(1000f*1000f))+"毫秒");}
    }
    
  • 测试结果:数组越大,利用 Fork/Join 框架实现的并行化归并排序比单线程归并排序的效率更高

    在这里插入图片描述

1.4 并行实现归并排序的优化和注意事项

  • 在实际应用里,为充分利用多核CPU,同时保障算法的正确性与效率,得考量数据分布均匀性、内存使用、线程切换开销等因素;

  • 具体而言:

    • 任务大小:它的选择会影响并行算法的效率与负载均衡。任务太小,任务划分和合并的开销就会过大;任务太大,又难以充分利用多核 CPU 的并行处理能力。所以要结合数据量、CPU 核心数等,选合适的任务大小;

    • 负载均衡:并行算法得保证各线程执行的任务大小和时间尽可能相等,不然有的线程会负载过重,有的则过轻。归并排序可通过递归调用来实现负载均衡,但递归层数不能太深,否则会增加线程创建和合并的开销;

    • 数据分布:数据分布均匀与否会影响并行算法效率和负载均衡。若数据分布不均,会出现有的线程处理数据量过大,有的过小的情况。实际应用要考虑数据分布,尽量把数据分成大小相等的子数组;

    • 内存使用:处理大规模数据时,内存使用情况对算法执行效率至关重要。归并排序可通过原地归并节约内存,但要注意归并的实现方式,避免数据覆盖和不稳定排序等问题;

    • 线程切换:线程切换是并行算法的一大开销,要尽量减少切换次数以提升效率。归并排序可通过设置线程池大小和调整任务大小来控制线程数量和切换开销,实现最优性能。

2 Fork/Join框架

2.1 简介

  • Fork/Join 是一个支持分治任务模型的并行计算框架,其中:

    • Fork:对应分治任务模型里的任务分解,就是把一个大任务拆分成许多小任务;
    • Join:对应结果合并,即等小任务并行执行完后,将它们的结果合并成一个大结果;
  • 它适用于可采用分治策略的计算密集型任务,像大规模数组排序、图形渲染、复杂算法求解等场景;

  • 下图清晰展示了其工作流程:大任务(Big Task)先通过 Fork 不断分解为更小的子任务(SubTask),子任务按顺序计算得到各自结果(Result),之后通过 Join 逐步合并这些结果,最终得到大任务的最终结果(Final Result);

    在这里插入图片描述

2.2 应用场景

  • 并行计算:能便捷执行大规模计算任务,把大任务分解为小任务,借助工作窃取算法并行执行,充分利用多核处理器优势,提升计算效率;
  • 递归任务处理:适合递归式的任务分解与执行,可递归地将大任务拆成诸多小任务,再通过工作窃取算法动态分配给工作线程执行;
  • 并行流操作:Java 8 引入的 Stream API 用于集合的函数式编程操作,ForkJoinPool 常用来执行并行流操作里的并行计算部分,比如对元素进行过滤、映射、聚合等;
  • 高性能任务执行:提供高性能的任务执行机制,通过动态调度任务和管理线程池,有效利用系统资源,在多核处理器上实现任务并行执行。

2.3 使用

  • Fork/Join 框架的主要组成部分是 ForkJoinPool、ForkJoinTask;
    • ForkJoinPool 是一个线程池,它用于管理 Fork/Join 中任务的执行;
    • ForkJoinTask 是一个抽象类,用于表示可以被分割成更小部分的任务。

2.3.1 ForkJoinPool

  • ForkJoinPool 是 Fork/Join 框架中的线程池类,它用于管理 Fork/Join 任务的线程;
  • ForkJoinPool 类包括一些重要的方法,例如submit()invoke()shutdown()awaitTermination()等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果;
  • ForkJoinPool 类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。

2.3.2 构造器

  • ForkJoinPool 有四个核心参数,用于控制线程池的并行数、工作线程创建、异常处理和模式指定:

    • int parallelism:指定并行级别(parallelism level),ForkJoinPool 会据此决定工作线程数量。若未设置,会用 Runtime.getRuntime().availableProcessors() 获取可用处理器数量来设置并行级别;

    • ForkJoinWorkerThreadFactory factoryForkJoinPool 创建线程时,通过该工厂创建。需实现 ForkJoinWorkerThreadFactory 接口,若不指定,由默认的 DefaultForkJoinWorkerThreadFactory 负责线程创建;

    • UncaughtExceptionHandler handler:指定异常处理器,当任务运行出错时,由该处理器处理异常;

    • boolean asyncMode:设置队列工作模式。asyncModetrue 时,使用先进先出队列;为 false 时,使用后进先出模式;

  • 代码示例:

    // 获取处理器数量
    int processors = Runtime.getRuntime().availableProcessors();
    // 构建forkjoin线程池,用获取到的处理器数量作为并行级别参数
    ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
    

2.3.3 任务提交方式

返回值方法
提交异步执行voidexecute(ForkJoinTask)<?> task):提交一个ForkJoinTask类型的任务进行异步执行
execute(Runnable task):提交一个Runnable类型的任务进行异步执行
等待并获取结果Tinvoke(ForkJoinTask task):提交ForkJoinTask类型的任务,等待任务执行完毕后获取结果
提交执行获取Future结果ForkJoinTasksubmit(ForkJoinTask task):提交ForkJoinTask类型的任务,返回对应的 ForkJoinTask
submit(Callable task):提交Callable类型的任务(Callable 有返回值),返回对应的ForkJoinTask
submit(Runnable task):提交Runnable类型的任务(Runnable 无返回值),返回对应的ForkJoinTask
submit(Runnable task, T result):提交Runnable类型的任务,同时指定一个结果,返回对应的ForkJoinTask

2.3.4 调用方法

  • ForkJoinTask 最核心的是fork()方法和join()方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取;

  • fork():用于向当前任务所运行的线程池中提交任务。如果当前线程是 ForkJoinWorkerThread 类型,将会放入该线程的工作队列,否则放入 common 线程池的工作队列中;

    在 Fork/Join 框架中,“该线程的工作队列”指的是当前执行任务的 ForkJoinWorkerThread 所拥有的工作队列

    • ForkJoinWorkerThreadForkJoinPool 中用于执行任务的工作线程。每个 ForkJoinWorkerThread 内部都维护了一个双端队列(Deque),这个队列就是“该线程的工作队列”;
    • 当通过 fork() 方法提交任务时,如果当前线程是 ForkJoinWorkerThread 类型,任务会被放入当前 ForkJoinWorkerThread 自己的双端队列中,后续该线程会从自己的队列里取出任务执行;
    • 这样的设计是为了配合“工作窃取(Work - Stealing)”算法:当一个线程的队列中没有任务时,它可以去其他线程的队列中“窃取”任务来执行,从而提高线程的利用率和整体并行效率;
  • join():用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。

2.3.5 处理递归任务

  • 比如计算斐波那契数列:

    public class Fibonacci extends RecursiveTask<Integer> {final int n;  // 要计算斐波那契数列的第n项Fibonacci(int n) {this.n = n;}/*** 重写RecursiveTask的compute()方法,实现斐波那契数列的递归计算* @return 斐波那契数列的第n项值*/protected Integer compute() {// 基准情况:斐波那契数列的前两项直接返回if (n <= 1)return n;// 创建计算F(n-1)的子任务Fibonacci f1 = new Fibonacci(n - 1);// 将子任务提交到ForkJoinPool异步执行(fork操作)f1.fork();// 创建计算F(n-2)的子任务,并在当前线程同步执行Fibonacci f2 = new Fibonacci(n - 2);// 合并结果:F(n) = F(n-1) + F(n-2)return f2.compute() + f1.join();  // f2.compute()同步计算,f1.join()等待异步任务完成}public static void main(String[] args) {// 构建forkjoin线程池(使用默认并行级别,通常是处理器核心数)ForkJoinPool pool = new ForkJoinPool();// 创建计算斐波那契数列第10项的任务Fibonacci task = new Fibonacci(10);// 提交任务到线程池并阻塞等待执行完成,返回最终结果int result = pool.invoke(task);System.out.println(result);  // 输出结果:55}
    }
    

    继承RecursiveTask:表示这是一个有返回值的递归任务,返回Integer类型结果

    斐波那契数列定义F(0)=0,F(1)=1,F(n)=F(n−1)+F(n−2)F(0)=0, F(1)=1, F(n)=F(n-1)+F(n-2)F(0)=0,F(1)=1,F(n)=F(n1)+F(n2)

    任务分解策略

    • F(n)F(n)F(n)分解为F(n−1)F(n-1)F(n1)F(n−2)F(n-2)F(n2)两个子问题
    • F(n−1)F(n-1)F(n1)通过fork()异步执行
    • F(n−2)F(n-2)F(n2)通过compute()在当前线程同步执行

    结果合并:使用f1.join()等待异步任务完成,然后与同步计算结果相加

    Fork/Join工作流程

    • fork():将任务推入工作队列,供其他工作线程窃取执行
    • join():阻塞当前线程直到任务完成并返回结果

    性能问题:这种实现方式存在大量重复计算,时间复杂度为O(2n)O(2^n)O(2n),实际应用中应使用动态规划等优化方法

    线程池使用pool.invoke(task)同步提交任务并等待结果,适合计算密集型递归任务

  • 如果 n 为100000,执行上面的代码会发生什么问题?在上面的例子中,由于递归计算 Fibonacci 数列的任务数量呈指数级增长,当 n 较大时,就容易出现 StackOverflowError 错误。这个错误通常发生在递归过程中,由于递归过程中每次调用函数都会在栈中创建一个新的栈帧,当递归深度过大时,栈空间就会被耗尽,导致 StackOverflowError 错误;

  • 那么如何解决栈溢出呢?可以使用迭代的方式计算 Fibonacci 数列,以避免递归过程中占用大量的栈空间。示例代码:

    public class Fibonacci {public static void main(String[] args) {int n = 100000;  // 要计算斐波那契数列的第100000项// 创建数组用于存储斐波那契数列值,长度为n+1(包含0到n项)long[] fib = new long[n + 1];// 初始化斐波那契数列的前两项fib[0] = 0;  // 第0项为0fib[1] = 1;  // 第1项为1// 使用动态规划(自底向上)方法计算斐波那契数列for (int i = 2; i <= n; i++) {// 递推公式:F(n) = F(n-1) + F(n-2)fib[i] = fib[i - 1] + fib[i - 2];}// 输出斐波那契数列的第100000项System.out.println(fib[n]);}
    }
    
  • 对于一些递归深度较大的任务,使用 Fork/Join 框架可能会出现任务调度内存消耗的问题;

    • 当递归深度较大时,会产生大量的子任务,这些子任务可能被调度到不同的线程中执行,而线程的创建和销毁以及任务调度的开销都会占用大量的资源,从而导致性能下降;
    • 此外,对于递归深度较大的任务,由于每个子任务所占用的栈空间较大,可能会导致内存消耗过大,从而引起内存溢出的问题;
    • 因此,在使用 Fork/Join 框架处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,以避免任务调度和内存消耗的问题。如果递归深度较大,可以尝试采用其他方法来优化算法,如使用迭代方式替代递归,或者限制递归深度来减少任务数量,以避免 Fork/Join 框架的缺点。

2.3.6 处理阻塞任务

  • 在 ForkJoinPool 中使用阻塞型任务时需要注意以下几点:

    • 防止线程饥饿
      • 当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止;
      • 为了避免这种情况,应该避免在 ForkJoinPool 中提交大量的阻塞型任务;
    • 使用特定的线程池
      • 为了最大程度地利用 ForkJoinPool 的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被 ForkJoinPool 的窃取机制所影响;
      • 例如,可以使用 ThreadPoolExecutor 来创建一个线程池,然后将这个线程池作为 ForkJoinPool 的执行器,这样就可以使用 ThreadPoolExecutor 来处理阻塞型任务,而使用 ForkJoinPool 来处理非阻塞型任务;
    • 不要阻塞工作线程
      • 如果在 ForkJoinPool 中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降;
      • 为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用 CompletableFuture 等异步编程工具来处理阻塞型任务;
  • 下面是一个使用阻塞型任务的例子,这个例子展示了如何使用 CompletableFuture 来处理阻塞型任务:

    public class BlockingTaskDemo {public static void main(String[] args) {// 构建一个forkjoin线程池(使用默认配置,通常为核心数)ForkJoinPool pool = new ForkJoinPool();// 创建一个异步任务,使用CompletableFuture并将其提交到ForkJoinPool中执行CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {// 模拟一个耗时的任务(阻塞5秒钟)TimeUnit.SECONDS.sleep(5);return "Hello, world!";  // 任务完成后的返回值} catch (InterruptedException e) {e.printStackTrace();return null;  // 发生中断异常时返回null}}, pool);  // 指定使用ForkJoinPool作为执行器try {// 阻塞等待任务完成,并获取结果(get()方法会阻塞当前线程直到任务完成)String result = future.get();System.out.println(result);  // 输出结果:"Hello, world!"} catch (InterruptedException e) {// 处理线程中断异常e.printStackTrace();} catch (ExecutionException e) {// 处理任务执行过程中抛出的异常e.printStackTrace();} finally {// 关闭ForkJoinPool,释放线程资源(优雅关闭,等待已提交任务完成)pool.shutdown();}}
    }
    
    • 在这个例子中,使用了 CompletableFuture 来处理阻塞型任务,因为它可以避免阻塞 ForkJoinPool 中的工作线程;
    • 另外,我们也可以使用专门的线程池来处理阻塞型任务,例如 ThreadPoolExecutor 等。不管是哪种方式,都需要避免在 ForkJoinPool 中提交大量的阻塞型任务,以免影响整个线程池的性能。

2.4 ForkJoinPool 工作原理

2.4.1 原理概述

  • 当通过 ForkJoinPool 的 invoke()submit() 方法提交任务时,ForkJoinPool 会依据路由规则将任务提交到某个任务队列。若任务执行过程中创建子任务,子任务会被提交到工作线程对应的任务队列;

  • 若工作线程对应的任务队列空了,ForkJoinPool 支持任务窃取机制,空闲的工作线程可“窃取”其他工作任务队列里的任务,确保所有工作线程都能有效利用,不闲置;

  • 核心设计

    • ForkJoinPool 内部存储有 WorkQueue 数组,提交给它的任务会被分配到指定的 WorkQueue 上执行;

    • 每个 WorkQueue 内部维护一个 ForkJoinTask 数组来存储待执行任务,还配有一个独立的 ForkJoinWorkerThread 来真正执行任务;

    在这里插入图片描述

2.4.2 工作线程 ForkJoinWorkerThread

  • ForkJoinWorkerThread 是 ForkJoinPool 里专门执行任务的线程。创建它时,会自动向 ForkJoinPool 注册一个 WorkQueue,这个 WorkQueue 是该线程专属的任务存储队列,且只能出现在 workqueues[] 的奇数位;

  • ForkJoinWorkerThread 工作线程启动后,会扫描并 “窃取” 任务执行。另外,当它在 ForkJoinTask#join() 等待返回结果时,若被 ForkJoinPool 线程池发现其任务队列为空,或者当前任务已执行完毕,也会通过工作窃取算法从其他任务队列获取任务,分配到自己的任务队列中执行;

    在这里插入图片描述

2.4.3 工作队列 WorkQueue

  • WorkQueue 一个是双端队列,用于存储工作线程自己的任务。每个工作线程都维护一个本地 WorkQueue,优先执行本地队列中的任务。当本地队列任务执行完,工作线程会尝试从其他线程的 WorkQueue 中窃取任务;

  • WorkQueue 任务队列分两种类型:一种是外部提交任务所用队列,在任务队列数组中数组下标为偶数;另一种是工作线程私有的任务队列,存储大任务 fork 分解出的任务,在任务队列数组中数组下标为奇数;

  • 下图展示了 WorkQueue 的操作,如 externalSubmit(task) 外部提交任务、subtask(fork) 子任务提交,还有 pushpop 等操作,以及根据 asynctrue 还是 false,分别对应 FIFO_QUEUE(先进先出队列)和 LIFO_QUEUE(后进先出队列)的工作模式;

    在这里插入图片描述

2.4.4 工作窃取机制

  • ForkJoinPool 和 ThreadPoolExecutor 有很大不同,ForkJoinPool 引入了工作窃取设计,这是其性能保障的关键之一;

  • 工作窃取指的是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从自己双端队列的头部获取任务;当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部获取任务,这种方式能最大限度减少线程竞争任务的可能性;

  • 下图展示了主线程提交任务到 ForkJoinPool 后,任务被推入工作队列(workQueue),ForkJoinWorkerThread - 1ForkJoinWorkerThread - 2 不仅会处理自身队列中的任务(如通过 fork 分解任务),当自身任务不足时,还会通过“steal 工作窃取”从其他工作队列获取任务;

    在这里插入图片描述

  • 通过工作窃取,Fork/Join 框架能实现任务的自动负载均衡,充分利用多核 CPU 的计算能力,同时避免线程饥饿和延迟问题;

  • 如果想了解更多关于 ForkJoinPool 的内容,可以参考Doug Lea 的论文。

2.5 ForkJoinPool 执行流程

在这里插入图片描述

  • 应用程序外部向Fork/Join框架提交任务(通过execute/submit/invoke)。此时有两种处理路径:

    • 路径1:直接提交到workQueues。调用 externalPush(task),将任务直接放入工作队列(workQueues)。若提交成功,会触发 signalWork(w, q),尝试用工作线程执行任务;

    • 路径2:初始化workQueues并提交任务。调用 externalSubmit(task),先初始化工作队列,再将任务提交到队列,后续逻辑与“路径1”类似,最终也会触发 signalWork(w, q)

  • signalWork(w, q) 执行后,会判断是否有空闲线程

    • 有空闲线程(Y分支):直接从空闲线程中选一个,执行任务(进入下面的任务执行阶段);

    • 无空闲线程(N分支):需要创建新的工作线程,流程如下:

      1. tryAddWorker(c):尝试添加新工作线程
      2. createWorker():创建工作线程实例
      3. new ForkJoinWorkerThread(pool):实例化ForkJoinWorkerThread
      4. forkJoinWorkerThread.start():启动工作线程,触发线程的run()方法
      5. forkJoinWorkerThread.run():线程启动后,执行run()方法,此时会先执行 registerWorker(this),将新线程注册到workQueues,完成线程与队列的绑定;
  • 工作线程就绪后,进入任务执行的核心逻辑:

    1. 任务入队等待执行。若任务是通过ForkJoinTask.fork()提交的,会调用 workQueue.push(this),将任务放入队列等待执行;

    2. 工作线程执行任务。调用 workQueue.runTask(),从队列中取出任务执行。执行时调用 forkJoinTask.doExec(),而doExec()最终会触发任务的核心逻辑:

      • 若任务是RecursiveTask(有返回值的分治任务),会调用 RecursiveTask.compute()(用户自定义的“任务拆分与计算逻辑”就在这里实现);
      • 执行的入口也可通过 exec() 方法触发,最终导向compute()
    3. 任务扫描与等待

      • 若当前队列“有任务执行”,则持续执行;

      • 若“未扫描到任务”,则调用 scan(w, r) 扫描其他队列的任务(Fork/Join的工作窃取(Work-Stealing)机制:空闲线程会从其他线程的队列“偷”任务执行,提升资源利用率);

      • 若扫描后仍无任务,调用 awaitWork(w, r) 让线程进入“等待任务”状态;

    4. 持续运行工作线程。整个执行过程由 runWorker(workQueue) 驱动,持续从队列中获取并执行任务。

2.6总结

  • Fork/Join是基于分治思想(把大任务拆成小任务,并行执行后合并结果)的编程模型,适合计算密集型任务(大量CPU运算的场景),效率提升源于两点:

    • 任务切分:将大任务拆成更小的子任务,让更多线程同时参与计算,充分利用多核CPU的并行能力;

    • 任务窃取:空闲的线程会“窃取”其他线程队列中未执行的任务,减少线程闲置,同时降低线程间的竞争(因为每个线程优先处理自己队列的任务,窃取是“补充性”的);

  • 使用ForkJoinPool(Fork/Join框架的线程池)时,要关注任务类型

    • 最适合的是纯函数计算型任务:任务不依赖外部状态、不修改共享变量(无副作用),这样并行执行时更安全,不会因“状态不一致”出问题;

    • 若要处理阻塞型任务(如I/O操作、等待锁),需谨慎评估:虽然ForkJoinPool能处理,但会导致线程被长时间占用(阻塞),破坏“任务窃取”的高效性,增加线程管理成本;

  • 普通线程池(如ThreadPoolExecutor)更适合IO密集型(如网络请求、文件读写)或“短小任务”,而Fork/Join针对大规模并行计算,核心区别有四点:

    • 工作窃取算法

      • ForkJoinPool:线程空闲时,会从其他线程的任务队列中“偷取”任务执行,最大化线程利用率;
      • 普通线程池:任务存在公共队列,线程直接从公共队列取任务,无“窃取”逻辑,空闲线程只能等新任务入队;
    • 任务的分解与合并

      • ForkJoinPool:支持分治——大任务拆成小任务并行执行,最终合并小任务结果得到大任务结果;
      • 普通线程池:任务是“独立的”,不支持“自动拆分-合并”,需手动处理任务依赖和结果聚合;
    • 工作线程的数量

      • ForkJoinPool:会自动根据CPU核心数设置工作线程数量,避免线程过多(减少上下文切换开销),最大化CPU性能;
      • 普通线程池:需手动指定线程数,若设置不合理(如线程过多导致切换频繁,或过少导致CPU闲置),会影响性能;
    • 任务类型适配

      • ForkJoinPool:适合大规模并行计算任务(如大数据排序、矩阵运算);
      • 普通线程池:适合短小的、IO密集型任务(如Web服务器处理请求、数据库操作)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/95280.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/95280.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/95280.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka面试精讲 Day 6:Kafka日志存储结构与索引机制

【Kafka面试精讲 Day 6】Kafka日志存储结构与索引机制 在“Kafka面试精讲”系列的第6天&#xff0c;我们将深入剖析 Kafka的日志存储结构与索引机制。这是Kafka高性能、高吞吐量背后的核心设计之一&#xff0c;也是中高级面试中的高频考点。面试官常通过这个问题考察候选人是否…

Linux 字符设备驱动框架学习记录(三)

Linux字符设备驱动开发新框架详解 一、新旧驱动框架对比 传统字符设备驱动流程 手动分配设备号 (register_chrdev_region)实现file_operations结构体使用mknod手动创建设备节点 新式驱动框架优势 自动设备号分配&#xff1a;动态申请避免冲突自动节点创建&#xff1a;通过class…

《计算机网络安全》实验报告一 现代网络安全挑战 拒绝服务与分布式拒绝服务攻击的演变与防御策略(1)

目 录 摘 要 一、研究背景与目的 1.1 介绍拒绝服务&#xff08;DoS&#xff09;和分布式拒绝服务&#xff08;DDoS&#xff09;攻击的背景 &#xff08;1&#xff09;拒绝服务攻击&#xff08;DoS&#xff09;  &#xff08;2&#xff09;分布式拒绝服务攻击&#xff0…

深度学习篇---模型组成部分

模型组成部分&#xff1a;在 PyTorch 框架下进行图像分类任务时&#xff0c;深度学习代码通常由几个核心部分组成。这些部分中有些可以在不同网络间复用&#xff0c;有些则需要根据具体任务或网络结构进行修改。下面我将用通俗易懂的方式介绍这些组成部分&#xff1a;1. 数据准…

关于ANDROUD APPIUM安装细则

1&#xff0c;可以先参考一下连接 PythonAppium自动化完整教程_appium python教程-CSDN博客 2&#xff0c;appium 需要对应的版本的node&#xff0c;可以用nvm对node 进行版本隔离 3&#xff0c;对应需要安装android stuido 和对应的sdk &#xff0c;按照以上连接进行下载安…

八、算法设计与分析

1 算法设计与分析的基本概念 1.1 算法 定义 &#xff1a;算法是对特定问题求解步骤的一种描述&#xff0c;是有限指令序列&#xff0c;每条指令表示一个或多个操作。特性 &#xff1a; 有穷性&#xff1a;算法需在有限步骤和时间内结束。确定性&#xff1a;指令无歧义&#xff…

机器学习从入门到精通 - 神经网络入门:从感知机到反向传播数学揭秘

机器学习从入门到精通 - 神经网络入门&#xff1a;从感知机到反向传播数学揭秘开场白&#xff1a;点燃你的好奇心 各位&#xff0c;有没有觉得那些能识图、懂人话、下棋碾压人类的AI特别酷&#xff1f;它们的"大脑"核心&#xff0c;很多时候就是神经网络&#xff01;…

神经网络模型介绍

如果你用过人脸识别解锁手机、刷到过精准推送的短视频&#xff0c;或是体验过 AI 聊天机器人&#xff0c;那么你已经在和神经网络打交道了。作为深度学习的核心技术&#xff0c;神经网络模仿人脑的信息处理方式&#xff0c;让机器拥有了 “学习” 的能力。一、什么是神经网络&a…

苹果开发中什么是Storyboard?object-c 和swiftui 以及Storyboard到底有什么关系以及逻辑?优雅草卓伊凡

苹果开发中什么是Storyboard&#xff1f;object-c 和swiftui 以及Storyboard到底有什么关系以及逻辑&#xff1f;优雅草卓伊凡引言由于最近有个客户咨询关于 苹果内购 in-purchase 的问题做了付费咨询处理&#xff0c;得到问题&#xff1a;“昨天试着把您的那几部分code 组装成…

孩子玩手机都近视了,怎样限制小孩的手机使用时长?

最近两周&#xff0c;我给孩子检查作业时发现娃总是把眼睛眯成一条缝&#xff0c;而且每隔几分钟就会用手背揉眼睛&#xff0c;有时候揉得眼圈都红了。有一次默写单词&#xff0c;他把 “太阳” 写成了 “大阳”&#xff0c;我给他指出来&#xff0c;他却盯着本子说 “没有错”…

医疗AI时代的生物医学Go编程:高性能计算与精准医疗的案例分析(六)

第五章 案例三:GoEHRStream - 实时电子病历数据流处理系统 5.1 案例背景与需求分析 5.1.1 电子病历数据流处理概述 电子健康记录(Electronic Health Record, EHR)系统是现代医疗信息化的核心,存储了患者从出生到死亡的完整健康信息,包括 demographics、诊断、用药、手术、…

GEM5学习(2):运行x86Demo示例

创建脚本 配置脚本内容参考官网的说明gem5: Creating a simple configuration script 首先根据官方说明创建脚本文件 mkdir configs/tutorial/part1/ touch configs/tutorial/part1/simple.py simple.py 中的内容如下&#xff1a; from gem5.prebuilt.demo.x86_demo_board…

通过 FinalShell 访问服务器并运行 GUI 程序,提示 “Cannot connect to X server“ 的解决方法

FinalShell 是一个 SSH 客户端&#xff0c;默认情况下 不支持 X11 图形转发&#xff08;不像 ssh -X 或 ssh -Y&#xff09;&#xff0c;所以直接运行 GUI 程序&#xff08;如 Qt、GNOME、Matplotlib 等&#xff09;会报错&#xff1a; Error: Cant open display: Failed to c…

1.人工智能——概述

应用领域 替代低端劳动&#xff0c;解决危险、高体力精力损耗领域 什么是智能制造&#xff1f;数字孪生&#xff1f;边缘计算&#xff1f; 边缘计算 是 数字孪生 的 “感官和神经末梢”&#xff0c;负责采集本地实时数据和即时反应。琐碎数据不上传总服务器&#xff0c;实时进行…

传统园区能源转型破局之道:智慧能源管理系统驱动的“源-网-荷-储”协同赋能

传统园区能源结构转型 政策要求&#xff1a;福建提出2025年可再生能源渗透率≥25%&#xff0c;山东强调“源网荷储一体化”&#xff0c;安徽要求清洁能源就地消纳。系统解决方案&#xff1a;多能协同调控&#xff1a;集成光伏、储能、充电桩数据&#xff0c;通过AI算法动态优化…

[光学原理与应用-353]:ZEMAX - 设置 - 可视化工具:2D视图、3D视图、实体模型三者的区别,以及如何设置光线的数量

在光学设计软件ZEMAX中&#xff0c;2D视图、3D视图和实体模型是三种不同的可视化工具&#xff0c;分别用于从不同维度展示光学系统的结构、布局和物理特性。它们的核心区别体现在维度、功能、应用场景及信息呈现方式上&#xff0c;以下是详细对比&#xff1a;一、维度与信息呈现…

《sklearn机器学习》——交叉验证迭代器

sklearn 交叉验证迭代器 在 scikit-learn (sklearn) 中&#xff0c;交叉验证迭代器&#xff08;Cross-Validation Iterators&#xff09;是一组用于生成训练集和验证集索引的工具。它们是 model_selection 模块的核心组件&#xff0c;决定了数据如何被分割&#xff0c;从而支持…

Trae+Chrome MCP Server 让AI接管你的浏览器

一、核心优势1、无缝集成现有浏览器环境直接复用用户已打开的 Chrome 浏览器&#xff0c;保留所有登录状态、书签、扩展及历史记录&#xff0c;无需重新登录或配置环境。对比传统工具&#xff08;如 Playwright&#xff09;需独立启动浏览器进程且无法保留用户环境&#xff0c;…

Shell 编程 —— 正则表达式与文本处理器

目录 一. 正则表达式 1.1 定义 1.2 用途 1.3 Linux 正则表达式分类 1.4 正则表达式组成 &#xff08;1&#xff09;普通字符 &#xff08;2&#xff09;元字符&#xff1a;规则的核心载体 &#xff08;3&#xff09; 重复次数 &#xff08;4&#xff09;两类正则的核心…

Springboot 监控篇

在 Spring Boot 中实现 JVM 在线监控&#xff08;包括线程曲线、内存使用、GC 情况等&#xff09;&#xff0c;最常用的方案是结合 Spring Boot Actuator Micrometer 监控可视化工具&#xff08;如 Grafana、Prometheus&#xff09;。以下是完整实现方案&#xff1a; 一、核…