Java Stream 宏观介绍见:深入解析 Java Stream 设计:从四幕剧看流水线设计与执行机制-CSDN博客

PipelineHelper

PipelineHelper 是 Java Stream API 内部一个至关重要的辅助类。正如其名,它是一个“管道助手”。可以把它想象成一个执行上下文对象,当一个流管道需要被执行时(即调用终端操作时),这个对象会被创建并传递,它封装了执行该管道所需的所有信息和核心工具方法。

  • 在 PipelineHelper 这个抽象类的定义中,没有声明任何字段。它是一个纯粹的行为和契约定义类,其状态将由它的具体实现类(主要是 AbstractPipeline)来持有。
  • PipelineHelper 是一个纯抽象类,它只定义了方法签名,没有任何一个方法的具体实现。所有方法的实现都委托给了它的子类。这种设计强制子类必须提供一套完整的管道执行机制。

抽象方法构成了 PipelineHelper 的核心能力,可以分为几类:

信息获取类方法

这类方法用于查询当前流管道的静态属性,是执行优化的关键依据。

  • abstract StreamShape getSourceShape();
    • 语义:获取管道源头的“形状”,即流中元素的类型是引用类型 (REFERENCE)、intlong 还是 double
  • abstract int getStreamAndOpFlags();
    • 语义:获取整个管道(从源头到当前操作)合并后的特征标记 (StreamOpFlag)。这些标记包括 SIZED(大小已知)、ORDERED(有序)、DISTINCT(元素唯一)、SHORT_CIRCUIT(可短路)等。终端操作会根据这些标记来选择最高效的执行策略。
  • abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
    • 语义:在已知的情况下,计算处理完给定的 spliterator 后,会产生的确切输出元素数量。如果源是 SIZED 的,并且中间没有 filter 等改变大小的操作,这个方法就能返回一个精确值。这对于 toArray 等操作预先分配内存空间至关重要。

核心执行类方法

这类方法定义了驱动管道数据流动的核心逻辑。

  • abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
    • 语义这是最核心的方法之一。它接受一个用于接收最终输出结果的 Sink(来自终端操作),然后用当前管道中所有中间操作的逻辑,从后往前地对这个 Sink 进行层层包装,最终返回一个包装好的、位于管道头部的 Sink。当向这个返回的 Sink 推入一个元素时,这个元素会依次流过 filtermap 等所有中间操作,最终(如果没被过滤掉)到达原始的 Sink
  • abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
    • 语义:将数据从源 spliterator 中不断取出,并喂给(accept)已经通过 wrapSink 包装好的 Sink。它负责驱动数据的流动。
  • abstract <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
    • 语义copyInto 的一个支持短路(cancellation)的版本。在每次推送元素后,它会检查 Sink.cancellationRequested(),如果为 true(例如 findFirst 找到了元素),就立刻停止迭代。
  • abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
    • 语义:一个便捷方法,它内部调用 wrapSink 和 copyInto,将包装和数据拷贝两步操作合并为一步。

结果聚合类方法

这类方法用于有状态操作(如 sorted)或部分终端操作(如 toArray)将结果收集到一个中间容器 Node 中。

  • abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator);
    • 语义:执行整个管道,并将所有输出结果收集到一个 Node 对象中返回。Node 是一个可以表示单个元素或元素树的内部结构。
  • abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator);
    • 语义:创建一个 Node.Builder,这是一个用于构建 Node 的辅助工具。

其他方法

  • abstract<P_IN> Spliterator<P_OUT> wrapSpliterator(Spliterator<P_IN> spliterator);
    • 语义:提供一种不同于 Sink 模型的执行方式。它将管道操作包装到一个新的 Spliterator 中,当遍历这个新的 Spliterator 时,操作会惰性地应用到原始数据上。

总结:语义和能力

PipelineHelper 的核心语义是封装一条流管道的完整执行方案。它不是管道本身,而是执行管道时所需的 “蓝图”和“工具箱” 。

它的能力可以总结为:

  1. 信息中心:提供关于管道的元数据(形状、标志、大小),供终端操作进行查询和优化决策。
  2. 执行引擎:定义了将数据源 (Spliterator) 和数据处理逻辑 (Sink 链) 连接起来并驱动数据流动的核心方法 (wrapSinkcopyInto)。
  3. 流程编排器:它将流的“声明式”定义(一系列 mapfilter 调用)转化为一个“命令式”的执行过程。
  4. 结果聚合器:为需要缓冲所有元素的有状态操作和终端操作提供了收集结果到 Node 的能力。

AbstractPipeline 类概述

AbstractPipeline 类的结构和源码。这个抽象类是所有类型流(如 ReferencePipeline for Stream<T>IntPipeline for IntStream 等)实现的基础,在 Java Stream API 中扮演着至关重要的角色。

StreamSupport.java 文件中的 stream() 方法,正是通过实例化 ReferencePipeline.HeadReferencePipeline 的一个内部类,而 ReferencePipeline 继承自 AbstractPipeline)来创建流的。

AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> 是一个抽象类,它实现了 BaseStream 和 PipelineHelper 接口。

  • E_IN: 上游阶段输入元素的类型。
  • E_OUT: 当前阶段输出元素的类型。
  • S: 当前流的类型 (例如 Stream<E_OUT>)。

主要结构和成员

AbstractPipeline 的字段主要用于构建和管理流管道这个双向链表,并维护整个管道的状态。

  • sourceStage: 指向管道链表的头节点(源阶段)。所有阶段都共享同一个 sourceStage,用于获取全局信息,如数据源和并行标志。
  • previousStage: 指向链表中的前一个阶段
  • nextStage: 指向链表中的后一个阶段
  • depth: 当前阶段在管道中的深度(从0开始的索引)。
  • sourceOrOpFlags当前阶段操作的标志(如 NOT_SIZED)。
  • combinedFlags: 从源阶段到当前阶段累计的所有标志。
  • sourceSpliterator / sourceSupplier: 数据源,只在 sourceStage 中有效。
  • parallel: 是否为并行流,只在 sourceStage 中有效。
  • linkedOrConsumed: 一个布尔标志,用于确保流只能被消费一次。一旦一个中间操作被链接到下游,或者一个终端操作开始执行,这个标志就会被设为 true,防止后续操作。
  • sourceCloseAction: 用于注册流关闭时需要执行的清理逻辑。

核心职责:构建和驱动流水线

AbstractPipeline 是整个 Stream API 实现的骨架和引擎。它的核心职责可以概括为以下几点:

  1. 流水线构建 (Pipeline Construction)

    • 双向链表结构:通过 previousStage 和 nextStage 字段,将各个操作阶段(stage)链接成一个双向链表。
    • 构造器:提供了三种核心构造器:
      • AbstractPipeline(Spliterator, ...) 和 AbstractPipeline(Supplier, ...) 用于创建流水线的源头 (Head)
      • AbstractPipeline(previousStage, ...) 用于追加 (Append) 一个新的中间操作阶段。
    • 状态管理:通过 linkedOrConsumed 标志,严格保证了“流只能被消费一次”的原则。在构造链条时,它会将被链接的 previousStage 标记为已消费,防止分叉。
  2. 标志位管理 (Flag Management)

    • sourceOrOpFlags: 每个阶段都持有自己的操作标志(如 SORTEDDISTINCT)。
    • combinedFlags: 在构建流水线时,通过 StreamOpFlag.combineOpFlags() 方法,自顶向下地累积和计算从源头到当前阶段的所有标志。这使得任何一个阶段都能快速知道整个上游流水线的综合特性(例如,是否整体有序 ORDERED,是否包含短路操作 SHORT_CIRCUIT)。这是非常高效的设计。
  3. 终端操作的统一入口和驱动 (Terminal Operation Driver)

    • evaluate(TerminalOp): 这是所有终端操作的总入口。它不关心具体的终端操作是什么,只负责根据 isParallel() 状态,调用 terminalOp.evaluateParallel(...) 或 terminalOp.evaluateSequential(...)
    • sourceSpliterator(int): 这是 evaluate 方法的核心辅助。它负责为终端操作准备好一个“就绪”的 Spliterator,尤其是通过预计算来处理并行流中的有状态操作(屏障),这是整个并行处理机制的精髓所在。
  4. 并行/顺序切换 (Parallel/Sequential Switching)

    • 通过 parallel() 和 sequential() 方法,简单地修改源头阶段的 parallel 标志位,就能影响整个流水线的执行模式。这种控制集中在源头的设计非常简洁。
  5. 核心 PipelineHelper 接口的实现

    • 它实现了 PipelineHelper 接口中的大部分通用逻辑,如 wrapAndCopyIntocopyIntocopyIntoWithCancel 等,这些方法定义了如何将 Spliterator 中的数据“灌入”到 Sink 链中,并处理了短路操作的逻辑。

留给子类实现的职责:与“形状”相关的具体行为

AbstractPipeline 完美地抽象了所有 Stream(无论其元素是对象、intlong 还是 double)的通用流水线逻辑。它刻意将所有与**“形状” (Shape)** 相关的具体实现细节留给了子类。这里的“形状”指的是流的类型(REFERENCEINT_VALUELONG_VALUEDOUBLE_VALUE)。

子类(如 ReferencePipelineIntPipeline 等)必须实现以下抽象方法:

  1. getOutputShape(): 返回当前阶段的输出“形状”。这是类型系统的基础。

  2. opWrapSink(int flags, Sink<E_OUT> sink)(核心抽象) 这是实现一个中间操作的关键。子类需要提供一个方法,将下游的 Sink “包装”成一个新的 Sink,这个新的 Sink 在接收到元素后,会执行当前阶段的操作(如 map 的转换逻辑,filter 的判断逻辑),然后将结果传递给原始的下游 Sink

  3. wrap(...) 和 lazySpliterator(...): 这两个方法负责创建与“形状”匹配的 Spliterator 包装器。例如,ReferencePipeline 会创建 StreamSpliterators.WrappingSpliterator,而 IntPipeline 会创建 StreamSpliterators.IntWrappingSpliterator

  4. evaluateToNode(...)makeNodeBuilder(...): 这些方法与将流的结果收集到 Node(一个内部的、用于聚合结果的树状结构)中有关。不同“形状”的流需要不同类型的 Node 和 Node.Builder(如 Node.OfReferenceNode.OfInt)。

  5. opEvaluateParallel(...) 和 opEvaluateParallelLazy(...)(核心抽象) 这是实现一个有状态操作的关键。子类必须为有状态操作(如 sorteddistinct)提供具体的并行求值逻辑。

设计原因分析与启示

这种设计的精妙之处在于 “模板方法模式” (Template Method Pattern)  和 “职责分离原则” (Separation of Concerns) 的完美应用。

  1. 模板方法模式:

    • AbstractPipeline 定义了流水线执行的骨架(模板),例如 evaluate() 方法规定了“获取 Spliterator -> 调用终端操作求值”这个流程。
    • 它将流程中可变的部分(如如何包装 Sink、如何创建 Spliterator)定义为抽象方法(opWrapSinkwrap),交由子类去实现。
    • 启示: 当我们设计一个具有固定流程但某些步骤细节可变的框架或组件时,模板方法模式是绝佳的选择。它能锁定核心逻辑的稳定性,同时提供高度的扩展性。
  2. 职责分离:

    • AbstractPipeline 只关心“如何驱动流水线”,它不关心流水线中流动的数据具体是什么类型,也不关心每个操作的具体业务逻辑。
    • 具体的操作逻辑被封装在 Sink 的实现中。
    • 具体的类型处理被封装在 ReferencePipelineIntPipeline 等子类中。
    • 启示: 在设计复杂系统时,要清晰地划分不同模块的职责边界。一个类或模块应该只有一个引起它变化的原因。AbstractPipeline 的变化原因仅仅是“流水线驱动逻辑的变更”,而不是“增加了一种新的中间操作”或“支持一种新的数据类型”。
  3. 组合优于继承:

    • 虽然这里用了继承,但流水线的构建更像是组合。每个 AbstractPipeline 对象都持有一个 previousStage 的引用,形成一个操作链。Stream 的行为是由这一系列对象的组合来定义的,而不是通过一个巨大的、多层继承的类来实现。
    • 启示: 优先考虑使用对象组合来构建复杂的行为,因为它比继承更加灵活。

通过分析 AbstractPipeline,我们可以学到以下几点来提升自己的设计能力:

  • 识别变与不变: 在设计之初,就要仔细思考你的系统中哪些是稳定不变的核心流程(不变),哪些是可能变化或需要扩展的细节(变)。将不变的部分固化在抽象基类中,将变化的部分抽象成接口或抽象方法。
  • 善用模板方法: 对于有固定步骤的业务流程,使用模板方法模式可以极大地简化子类的实现,并保证核心流程的正确性。
  • 明确职责边界: 一个好的抽象类应该有一个清晰、单一的职责。AbstractPipeline 的职责就是“流水线控制器”,它做得非常出色。避免设计“万能”的基类。
  • 面向接口/抽象编程AbstractPipeline 内部大量使用了 SpliteratorSinkTerminalOp 等接口,而不是具体的实现类。这使得它可以与任何符合接口定义的组件协同工作,大大增强了灵活性。
  • 管理好状态AbstractPipeline 通过 linkedOrConsumed 和 parallel 等几个关键的状态字段,清晰地管理了流的生命周期和执行模式,保证了正确性和线程安全。在自己的设计中,也要仔细考虑对象的状态转换和并发访问问题。

为什么叫做pipeline而不是node?

单个 AbstractPipeline 对象很像一个链表中的节点 (Node)。然而,它被命名为 AbstractPipeline(抽象管道)而不是 AbstractStage 或 AbstractNode 是因为它所代表的概念和职责远超一个简单的节点。

简单来说:一个 AbstractPipeline 实例代表管道中的一个“阶段”,但它本身的设计和功能是为了构建和管理整个“管道”

让我们从几个方面来深入理解:

1. 结构上:既是节点,也知全局

AbstractPipeline 内部通过 previousStage 和 nextStage 字段,将各个操作(如 mapfilter)链接起来,形成一个双向链表结构。这确实是节点的特征。

但是,请看这个关键字段:

// ... existing code .../*** Backlink to the head of the pipeline chain (self if this is the source* stage).*/@SuppressWarnings("rawtypes")private final AbstractPipeline sourceStage;
// ... existing code ...

这个 sourceStage 字段让每一个“节点”都能直接访问到整个链表的头部,也就是数据源头。一个普通的链表节点通常只知道它的前驱和后继。而 AbstractPipeline 的每个实例都拥有对整个管道起点的认知。

此外,还有 combinedFlags 字段,它会累积从源头到当前阶段所有操作的特性(比如 SIZEDDISTINCT 等)。

这种设计意味着,虽然它是一个独立的阶段(节点),但它始终携带着整个管道(到它为止)的上下文信息。它的行为和优化都依赖于对整个管道的理解,而不仅仅是它自己这个环节。

2. 概念上:代表“数据流动的管道”

“Pipeline”(管道)这个词更强调的是过程和流动。数据从 sourceStage 流入,依次穿过每一个由 AbstractPipeline 实例代表的中间操作,最后从末端流出到终端操作。

  • Node (节点) 是一个静态的概念,强调“存在于某处”。
  • Pipeline (管道) 是一个动态的概念,强调“从一处流向另一处的过程”。

Stream API 的核心就是建立一个数据处理的流水线。AbstractPipeline 正是构建这条流水线的基础组件。所以用“Pipeline”来命名,更能体现其在数据流处理中的角色和作用

3. 功能上:懒加载的“配方”

整个由 AbstractPipeline 链接起来的结构,在终端操作(如 collectforEach)被调用之前,什么也不做。它只是一个“配方”或者说一个“蓝图”,描述了数据应该如何被处理。

当终端操作开始时,它会从最后一个 AbstractPipeline 阶段开始,反向遍历整个链表(管道),将每个阶段的操作包装(wrapSink)成一个最终的、融合的 Sink。然后,一次性地从源头拉取数据,让数据流过这个融合后的 Sink,完成所有计算。

所以,AbstractPipeline 的链表结构本身就是对整个处理 流程(Pipeline) 的定义。

总结

特性像一个 Node为什么叫 Pipeline
结构通过 previousStagenextStage 链接,形成链表。每个实例都通过 sourceStage 了解管道源头,并用 combinedFlags 累积全局状态。
概念是构成链表的一个基本单元。强调数据从源头流经一系列处理阶段的动态过程。
功能封装了单个操作(如 map, filter)。整个链表共同定义了一个完整的、可执行的数据处理流水线(配方)。

因此,虽然 AbstractPipeline 的实例在结构上扮演着“节点”的角色,但它的命名着眼于它所构建的、更宏大的“数据处理管道”这一核心概念。这个命名精准地反映了 Stream API 的设计哲学。

启动Stream:evaluate

有两个evaluate,它们的关系是:

  • final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) (将军/Orchestrator): 这是高层次的、通用的执行入口。它负责启动任何类型的终端操作。它不关心终端操作具体做什么,只负责接收一个代表“任务”的 TerminalOp 对象,然后决定是按顺序执行还是并行执行,并把具体的执行工作委托给这个 TerminalOp 对象。

  • final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...) (工兵/Worker): 这是一个低层次的、专用的执行方法。它只有一个非常具体的目标:将流中的所有元素收集到一个内部数据结构 Node 中。它是一个“工具”方法,被那些需要先把所有元素聚合起来才能进行下一步操作的终端操作所使用。


evaluate(TerminalOp<E_OUT, R> terminalOp) - 通用执行引擎

// ... existing code ...final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {assert getOutputShape() == terminalOp.inputShape();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;return isParallel()? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())): terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}
// ... existing code ...
  • 职责:

    1. 接收任务: 它的唯一参数 TerminalOp<E_OUT, R> 是一个接口,代表了一个完整的终端操作。像 count()collect()reduce()forEach() 等操作,在内部都会被封装成一个实现了 TerminalOp 接口的对象。这个对象知道如何进行求值、如何合并结果。
    2. 检查状态: 检查流是否已经被使用过 (linkedOrConsumed)。
    3. 决策与分派: 核心逻辑是 isParallel() 判断。
      • 如果流是并行的,它就调用 terminalOp.evaluateParallel(...)
      • 如果流是顺序的,它就调用 terminalOp.evaluateSequential(...)
    4. 返回最终结果: 它返回泛型 <R>,也就是终端操作的最终用户可见结果。这个 R 可以是任何类型,比如 Long (对于 count)、List<T> (对于 collect)、void (对于 forEach)。
  • 总结: 这个方法是所有终端操作的总指挥。它连接了用户调用的 Stream.collect() 和实际执行该操作的 TerminalOp 对象,并根据并行状态选择正确的执行路径。

evaluate(Spliterator<P_IN> spliterator, ...) - 节点收集器

// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...
  • 职责:

    1. 单一目标: 它的目标非常明确,就是执行流水线,并将所有输出元素收集到一个 Node 对象里。Node 是 Stream API 内部用来暂存元素集合的高效数据结构。
    2. 需要具体实现: 它需要一个 generator (如 String[]::new) 来知道如何创建最终存放元素的数组。
    3. 返回中间结构: 它返回的是 Node<E_OUT>,这是一个内部数据结构,而不是通常用户直接消费的最终结果。
  • 总结: 这个方法是一个专用工具,用于将流“物化”到内存中。它本身不是一个完整的终端操作,而是某些终端操作(比如 toArray())实现过程中的一个步骤。


一个调用链的例子

让我们以 stream.toArray(String[]::new) 为例,看看它们是如何协同工作的:

  1. 用户调用stream.toArray(String[]::new)

  2. 创建 TerminalOptoArray 方法内部会创建一个 TerminalOp 的实例。这个 TerminalOp 的逻辑大致是:“请给我一个包含所有流元素的 Node,然后我会用这个 Node 和传入的 String[]::new 来创建一个 String 数组作为最终结果”。

  3. 调用“将军”toArray 方法接着会调用高层 evaluate(theToArrayTerminalOp)

  4. “将军”做决策evaluate(TerminalOp) 检查 isParallel()。我们假设是顺序执行。

  5. “将军”下令: 它调用 theToArrayTerminalOp.evaluateSequential(this, spliterator)

  6. TerminalOp 的实现evaluateSequential 的实现需要一个 Node。于是,它内部就会调用低层的、作为“工兵”的 evaluate(spliterator, true, generator)

  7. “工兵”干活: 这个低层 evaluate 方法启动流水线,将所有元素收集到一个 Node 中,并返回这个 Node

  8. 完成任务TerminalOp 拿到 Node 后,从中提取元素,创建用户期望的 String[] 数组。

  9. 返回结果: 最终的数组被层层返回,直到最初的用户调用处。

这两个方法的设计体现了优秀的关注点分离(Separation of Concerns):一个负责顶层流程控制,另一个负责具体的数据聚合任务,使得整个 Stream 执行框架既灵活又清晰。

toArray实现没有使用 teminalOp,而是直接调用evaluateToArrayNode,这个会间接调用evaluate(Spliterator<P_IN> spliterator, ...)

对于其它op,基本只需要直接调用 wrapAndCopyInto,比如ReduceOp的任务是将流中的所有元素聚合成一个单一的值,它不需要在内存中保留所有元素,可以逐个处理元素,并不断更新一个累加器。

Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, ...)

evaluate 的核心作用是启动流水线并将其所有输出元素收集到一个内部数据结构 Node 中Node 是 Stream API 用于在内存中高效存储一系列元素的内部表示,它可以是一个简单的数组包装,也可以是一个更复杂的树形结构(在并行计算中使用)。

此方法是许多终端操作(如 toArray()reduce 的部分形式)的最终执行逻辑。它的设计思想是根据流的并行/顺序状态,分派到两种截然不同的执行策略

  • 顺序执行:逻辑相对简单,创建一个 Node.Builder(它本身也是一个 Sink),然后利用我们之前分析过的 wrapAndCopyInto 机制,将所有元素推入 Builder 中,最后构建出 Node
  • 并行执行:逻辑复杂得多,它将任务委托给一个抽象的 evaluateToNode 方法。该方法内部会利用 Fork/Join 框架,将数据源 Spliterator 分割成多个部分,并行处理,然后将各个部分的结果(通常是多个 Node)合并成一个最终的 Node

final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator)
  • final: 此方法不可被子类重写,是框架的核心稳定部分。
  • <P_IN>: 方法级泛型,代表输入 Spliterator 的元素类型。
  • Node<E_OUT>: 返回值。E_OUT 是当前 Pipeline 阶段的输出类型。返回一个包含所有流元素的 Node
  • Spliterator<P_IN> spliterator: 数据源。
  • boolean flatten: 一个标志。在并行计算中,结果可能是一个 Node 树。如果 flatten 为 true,则要求返回的 Node 是一个扁平化的结构(即一个包含所有元素的单一数组),而不是树。
  • IntFunction<E_OUT[]> generator: 一个函数,用于创建指定类型的数组,例如 String[]::new。这是 Java 泛型数组创建的标准模式。

evaluate 的实现是一个清晰的 if-else 分支:

// ... existing code ...@Overridefinal <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<E_OUT[]> generator) {if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);}else {Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);return wrapAndCopyInto(nb, spliterator).build();}}
// ... existing code ...

顺序执行路径

这是相对简单的路径,我们先分析它。

Node.Builder<E_OUT> nb = makeNodeBuilder(exactOutputSizeIfKnown(spliterator), generator);
return wrapAndCopyInto(nb, spliterator).build();

第 1 步: exactOutputSizeIfKnown(spliterator) - 计算确切大小

此方法尝试在流开始处理前,预先计算出最终会输出多少个元素。这是一个重要的优化,如果大小已知,Node.Builder 就可以一次性分配足够大的内存,避免后续动态扩容。

  • exactOutputSizeIfKnown:
    1. int flags = getStreamAndOpFlags(); 获取当前流水线的所有标志。
    2. long size = StreamOpFlag.SIZED.isKnown(flags) ? spliterator.getExactSizeIfKnown() : -1;
      • 检查流水线是否具有 SIZED 特性。像从 ArrayList 创建的流就有这个特性。
      • 如果有,就调用 spliterator.getExactSizeIfKnown() 获取源头大小。
      • 如果没有,大小未知,返回 -1
    3. if (size != -1 && StreamOpFlag.SIZE_ADJUSTING.isKnown(flags) && !isParallel())
      • 这是一个针对顺序流的进一步计算。如果源大小已知,并且流水线中有会调整大小的操作(SIZE_ADJUSTING,例如 limit()),则需要逐个阶段计算最终大小。
      • for (AbstractPipeline<?, ?, ?> stage = sourceStage.nextStage; ...): 循环遍历从源之后到当前的所有中间操作。
      • size = stage.exactOutputSize(size);: 每个阶段都会根据自己的逻辑调整大小。例如,limit(10) 的 exactOutputSize 实现就会返回 Math.min(previousSize, 10)
    4. 最终返回计算出的 size 或 -1

第 2 步: makeNodeBuilder(...) - 创建节点构建器

这是一个 abstract 方法,其具体实现由子类(如 ReferencePipelineIntPipeline)提供。

  • 递归分析 makeNodeBuilder:
    • 它接收上一步计算出的 exactSizeIfKnown 和数组生成器 generator
    • 在 ReferencePipeline 中,它会调用 Nodes.builder(exactSizeIfKnown, generator)
    • Nodes.builder 会根据 exactSizeIfKnown 的值,决定是创建一个固定大小的 Builder 还是一个可动态扩容的 Builder
    • 这个 Node.Builder 同时实现了 Sink 接口,所以它可以作为数据流的目的地。它的 accept 方法就是将接收到的元素添加到内部的存储中。

第 3 步: wrapAndCopyInto(nb, spliterator).build() - 封装、执行、构建

这是执行的核心。

  • 递归分析 wrapAndCopyInto:

    1. wrapSink(nb): 调用 wrapSink,将 Node.Builder (nb) 从后向前用流水线中的每个中间操作逻辑进行包装。例如,map(f).filter(p),会先用 filter 的逻辑包装 nb,再用 map 的逻辑包装 filter 后的 Sink。返回一个包含了所有操作的 wrappedSink
    2. copyInto(wrappedSink, spliterator): 调用 copyInto,它会启动数据流,将 spliterator 的数据推送到 wrappedSink
    3. 返回 nbwrapAndCopyInto 执行完毕后,返回最初的、未被包装的 Node.Builder 实例 nb。此时,nb 内部已经包含了所有处理过的元素。
  • .build(): 在 nb 上调用 build() 方法,它会完成构建过程(例如,将内部的动态数组裁剪到合适的大小),并返回一个最终的、不可变的 Node 对象。

并行执行路径

 
if (isParallel()) {// @@@ Optimize if op of this pipeline stage is a stateful opreturn evaluateToNode(this, spliterator, flatten, generator);
}

当 isParallel() 返回 true 时,执行会进入此分支。

evaluateToNode(this, spliterator, flatten, generator) - 并行求值

此方法是并行执行的核心,但它本身也是一个 abstract 方法。

  • 递归分析 evaluateToNode:
    • 为什么是抽象的? 因为针对不同数据类型(引用类型 vs. 原始类型 intlongdouble)的并行处理和数据存储方式有很大差异。原始类型流可以利用连续内存的数组进行高效计算,而引用类型流则不能。将此方法设为抽象,可以强制每个子类(ReferencePipelineIntPipeline 等)提供最高效的并行实现。
    • 内部发生了什么? 以 ReferencePipeline 为例,它的 evaluateToNode 实现大致如下:
      1. 调用 Nodes.collect
      2. 大小已知路径 (Sized Path): 如果流的大小可以精确预知 (size >= 0) 并且源 Spliterator 支持 SUBSIZED 特性,系统会采取最高效的策略。它会预先分配一个最终大小的数组,然后启动一个 SizedCollectorTask(一个 Fork/Join 任务),让所有并行线程直接将结果写入到数组的指定位置。这避免了任何中间数据结构和后续的数据拷贝。
      3. 大小未知路径 (Unsized Path): 如果大小未知,系统会启动一个 CollectorTask。这个任务会递归地分解,每个子任务都会生成一个 Node,然后通过 conc 方法合并成一个 ConcNode 树。最终返回的就是这个树的根节点。flattenTree 参数决定了是否需要将这棵树“压平”后再返回。

总结

evaluate 方法是 Stream API 内部一个设计极为精巧的执行引擎入口。它通过一个简单的 isParallel() 判断,将执行流导向两个完全不同的世界:

  • 顺序世界:清晰、线性、易于理解。通过 Sink 链的包装和 forEachRemaining 的驱动,一步步完成数据处理。
  • 并行世界:复杂、递归、高性能。通过抽象化和 Fork/Join 框架,将大规模数据处理任务分解、并行执行并合并结果。

它完美地体现了策略模式,根据上下文(并行或顺序)选择合适的算法来完成“将流元素收集到 Node 中”这一任务。对它的分析,可以帮助我们深入理解 Stream API 在不同模式下的核心工作原理。

    Stream构建过程 wrapSink

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink)
    
    • final: 这个方法是最终的,不允许任何子类(如 ReferencePipelineIntPipeline 等)重写。这表明它是 Stream 框架的核心、稳定且不可更改的机制。
    • <P_IN>: 这是一个泛型参数。它代表了经过所有包装后,最终返回的那个 Sink 所能接受的元素类型。这个类型实际上是当前流水线阶段的上一个阶段的输出类型
    • Sink<P_IN>: 这是返回类型。它返回一个 Sink,这个 Sink 已经将当前 AbstractPipeline 节点以及它之前的所有节点的操作逻辑都“包裹”起来了。
    • Sink<E_OUT> sink: 这是输入参数。它接收一个 SinkE_OUT 是当前 AbstractPipeline 节点的输出类型。这个传入的 sink 通常是流水线中下一个阶段的 Sink,或者是终端操作(Terminal Operation)提供的最终 Sink

    一句话概括wrapSink 方法接收一个用于处理本阶段输出的 Sink,然后返回一个能够处理本阶段输入的、经过层层包装的全新 Sink

    wrapSink 的核心思想是操作融合(Operation Fusion)责任链模式的反向构建

    我们知道 Stream 的中间操作是懒加载的。当写下 stream.filter(...).map(...).sorted(...) 时,数据并没有开始流动。只是在构建一个处理步骤的“配方”(AbstractPipeline 链表)。

    当一个终端操作(如 forEachcollect)被调用时,Stream 需要开始处理数据。但它不是低效地让数据流过 filter,把结果存起来,再流过 map... 而是希望一次性完成所有操作。

    wrapSink 就是实现这个“一次性完成”的关键。它的工作流程如下:

    1. 从流水线的末端(最靠近终端操作的那个中间操作)开始。
    2. 接收终端操作提供的“最终 Sink”(比如,Collectors.toList() 会提供一个将元素添加到 List 的 Sink)。
    3. 将这个 Sink 用当前阶段的操作逻辑(比如 map 的转换逻辑)包装起来,生成一个新的 Sink。
    4. 拿着这个新生成的 Sink,移动到前一个流水线阶段。
    5. 用前一个阶段的操作逻辑(比如 filter 的过滤逻辑)再次包装。
    6. ...如此循环,直到回到数据源头。

    最终会得到一个“俄罗斯套娃”式的 Sink,它内部包含了所有中间操作的逻辑。当数据从源头(Spliterator)取出后,只需调用这个最终 Sink 的 accept 方法,数据就会像穿过一根融合后的管道一样,瞬间完成所有处理。


    现在我们来看具体的实现,它非常精炼:

    // ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {Objects.requireNonNull(sink);for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {sink = p.opWrapSink(p.previousStage.combinedFlags, sink);}return (Sink<P_IN>) sink;}
    // ... existing code ...
    
    1. Objects.requireNonNull(sink);

      • 标准的非空检查,确保传入的下游 Sink 是有效的。
    2. for ( AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage)

      • 初始化AbstractPipeline p = AbstractPipeline.this。循环从当前 AbstractPipeline 实例开始。在实际调用中,this 通常是流水线的最后一个中间操作。
      • 循环条件p.depth > 0depth 属性记录了当前阶段距离源头的“深度”。源头(Head)的 depth 是 0。所以这个循环会一直持续,直到回溯到源头为止。
      • 迭代p = p.previousStage。在每次循环后,p 指向链表中的上一个节点,实现了从后往前的遍历
    3. sink = p.opWrapSink(p.previousStage.combinedFlags, sink);

      • 这是循环的核心。它调用了当前阶段 p 的 opWrapSink 方法。
      • opWrapSink 是一个抽象方法,必须由具体的中间操作(如 mapfilter 的匿名内部类)来实现。
      • 它将当前的 sink(也就是下游操作的 Sink)传进去。
      • opWrapSink 的实现会返回一个新的、包装后的 Sink
      • 这个新的 Sink 被重新赋值给 sink 变量,用于下一次循环(即交给上一个阶段继续包装)。
    4. return (Sink<P_IN>) sink;

      • 当循环结束时,sink 变量所持有的已经是被流水线上所有中间操作层层包裹后的最终 Sink
      • 它被强制转换为 Sink<P_IN> 并返回。此时 P_IN 对应的是第一个中间操作的输入类型。

    关键交互:opWrapSink 方法

    wrapSink 只是一个驱动循环,真正的魔法发生在每个操作对 opWrapSink 的具体实现中。我们来看几个例子(以 LongPipeline 为例,原理相通):

    示例 1: peek() 操作 peek 的作用是在元素流过时执行一个动作,但不对元素做任何修改。

    // ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {action.accept(t); // 执行 peek 的动作downstream.accept(t); // 将原始元素传递给下游}};}// Sink.javaabstract static class ChainedLong<E_OUT> implements Sink.OfLong {protected final Sink<? super E_OUT> downstream;public ChainedLong(Sink<? super E_OUT> downstream) {this.downstream = Objects.requireNonNull(downstream);}@Overridepublic void begin(long size) {downstream.begin(size);}@Overridepublic void end() {downstream.end();}@Overridepublic boolean cancellationRequested() {return downstream.cancellationRequested();}}
    // ... existing code ...
    

    opWrapSink 返回了一个新的 Sink。当这个新 Sink 的 accept 方法被调用时,它先执行 peek 的 action,然后原封不动地调用下游 sink(下游sink通过构造函数 赋值给 downstream)的 accept 方法。

    示例 2: filter() 操作

    // ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {
    // ... existing code ...@Overridepublic void accept(long t) {if (predicate.test(t)) // 执行过滤downstream.accept(t); // 满足条件才传递给下游}};}
    // ... existing code ...
    

    filter 的 opWrapSink 实现中,只有当元素 t 满足 predicate 条件时,才会调用下游 sink 的 accept 方法。

    示例 3: map() 操作

    // ... existing code ...@OverrideSink<Long> opWrapSink(int flags, Sink<Long> sink) {return new Sink.ChainedLong<>(sink) {@Overridepublic void accept(long t) {downstream.accept(mapper.applyAsLong(t)); // 将转换后的元素传递给下游}};}
    // ... existing code ...
    

    map 的 opWrapSink 实现中,它先用 mapper 对元素 t 进行转换,然后将转换后的结果传递给下游 sink


    调用时机与完整流程

    wrapSink 通常在终端操作的 evaluate 方法中被间接调用,例如通过 wrapAndCopyInto

    一个完整的流程是这样的:

    1. 构建Stream.of(1,2,3).filter(i -> i > 1).map(i -> i * 2)。这会创建一个 AbstractPipeline 的链表。
    2. 触发: 调用终端操作 .forEach(System.out::println)
    3. 准备执行forEach 操作的 evaluate 方法被调用。它会创建一个最终的 Sink,这个 Sink 的 accept 方法就是 System.out::println
    4. 包装 Sinkevaluate 方法内部会调用 wrapSink(finalSink)
      • wrapSink 从 map 阶段开始。它调用 map 的 opWrapSink,传入 finalSink。返回一个 mapSink,其 accept 方法会执行 i -> finalSink.accept(i * 2)
      • wrapSink 移动到 filter 阶段。它调用 filter 的 opWrapSink,传入上一步得到的 mapSink。返回一个 filterSink,其 accept 方法会执行 i -> { if (i > 1) mapSink.accept(i); }
    5. 执行: 循环结束,wrapSink 返回了最终的 filterSink
    6. 数据流动: 系统开始从源 Spliterator 中获取数据(1, 2, 3),并逐个调用 filterSink.accept()
      • filterSink.accept(1) -> 1 > 1 为 false,什么也不做。
      • filterSink.accept(2) -> 2 > 1 为 true,调用 mapSink.accept(2) -> 调用 finalSink.accept(2 * 2) -> 打印 4。
      • filterSink.accept(3) -> 3 > 1 为 true,调用 mapSink.accept(3) -> 调用 finalSink.accept(3 * 2) -> 打印 6。

    总结

    wrapSink 是 Stream API 实现高性能的核心机制之一。它不是一个简单的工具方法,而是将声明式的操作链表转换为高效的、融合的执行计划的“编译器”

    通过从后向前的遍历和责任链模式的运用,它将多个独立的中间操作逻辑“编织”到一个单一的 Sink 对象中,使得数据可以在一次遍历中完成所有处理,极大地减少了中间状态的存储和方法调用的开销。理解了 wrapSink,就理解了 Java Stream 运行时的精髓。

    copyIntoWithCancel

    这个方法是处理 短路操作(Short-Circuiting Operations) 的核心,比如 findFirstanyMatchlimit 等。

    在 Stream 流水线中,大部分操作(如 mapfilter)需要处理完所有元素。但有些操作希望能提前终止,一旦满足某个条件就不再处理后续元素,这就是“短路”。

    • findFirst(): 找到第一个元素后就应该立即停止。
    • limit(n): 处理完 n 个元素后就应该立即停止。
    • anyMatch(p): 找到任何一个匹配的元素后就应该立即停止。

    copyIntoWithCancel 的核心作用就是为支持短路操作提供一个高效的数据遍历和推送机制

    它的设计思想是:在数据从源头(Spliterator)流向 Sink 的过程中,每处理完一个元素,就通过 Sink.cancellationRequested() 方法检查下游是否发出了“取消”信号。如果收到了取消信号,就立即停止遍历,不再从 Spliterator 中拉取更多的数据。

    这与非短路操作的 copyInto 方法形成对比,后者通常会使用 spliterator.forEachRemaining(wrappedSink) 一次性将所有元素推送给 Sink,效率更高,但无法中途停止。

    final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
    
    • final: 此方法不可被子类重写,是框架的核心稳定部分。
    • <P_IN>: 泛型参数,代表 wrappedSink 能接受的元素类型,也就是数据源 Spliterator 提供的元素类型。
    • boolean: 返回值。返回 true 表示遍历是因为收到了取消请求而提前终止的;返回 false 表示遍历是正常完成的(所有元素都被处理了)。
    • Sink<P_IN> wrappedSink: 经过 wrapSink 方法包装后的最终 Sink。它内部已经融合了流水线上所有中间操作的逻辑。
    • Spliterator<P_IN> spliterator: 数据源的 SpliteratorcopyIntoWithCancel 将从它这里拉取数据。

    代码分析

    // ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {@SuppressWarnings("rawtypes")AbstractPipeline p = AbstractPipeline.this;while (p.depth > 0) {p = p.previousStage;}wrappedSink.begin(spliterator.getExactSizeIfKnown());boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);wrappedSink.end();return cancelled;}
    // ... existing code ...
    
    1. AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; }

      • 这段代码的目的是找到流水线的源头阶段(Source Stage)
      • AbstractPipeline 实例构成一个链表,depth 属性表示当前阶段距离源头的距离。源头的 depth 为 0。
      • 通过 p = p.previousStage 不断回溯,循环结束后,变量 p 就指向了链表的头部,即代表数据源的那个 AbstractPipeline 实例。
      • 为什么需要找到源头? 因为遍历的逻辑(特别是针对不同数据类型,如 intlongObject 的遍历)是与源头的 StreamShape 相关的。forEachWithCancel 是一个抽象方法,其具体实现位于 ReferencePipelineIntPipeline 等具体的 Pipeline 子类中,而调用哪个实现版本取决于源头的类型。
    2. wrappedSink.begin(spliterator.getExactSizeIfKnown());

      • 这是 Sink 协议的一部分。在开始向 Sink 推送数据之前,必须调用 begin() 方法。
      • 它会通知 Sink 准备接收数据,并可选地告知预计的元素数量。对于短路操作,这个数量可能不准,但协议要求必须调用。
    3. boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink);

      • 这是整个方法的核心。
      • 它调用了源头阶段 p 的 forEachWithCancel 方法。
      • forEachWithCancel 是一个抽象方法,由具体的 Pipeline 实现(如 ReferencePipeline)提供。它的职责是:
        • 从 spliterator 中逐个拉取元素。
        • 将每个元素传递给 wrappedSink.accept()
        • 在处理完每个元素后,检查 wrappedSink.cancellationRequested()
        • 如果返回 true,则立即停止遍历,并向上返回 true
      • cancelled 变量记录了遍历是否被提前取消。
    4. wrappedSink.end();

      • Sink 协议的另一部分。在所有数据推送完毕(无论是正常结束还是被取消)后,必须调用 end() 方法。
      • 这会通知 Sink 数据流结束,可以进行一些最终处理,比如 limit 操作可以丢弃多余的元素,findFirst 可以标记已找到结果。
    5. return cancelled;

      • 将 forEachWithCancel 的结果返回,告知上游调用者(通常是终端操作的 evaluate 方法)数据处理是否被短路了。

    forEachWithCancel 方法

    copyIntoWithCancel 将核心的遍历逻辑委托给了 forEachWithCancel。我们来看一下 ReferencePipeline 中 forEachWithCancel 的典型实现:

     
    
    // In ReferencePipeline.java
    @Override
    final boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) {boolean cancelled = false;// tryAdvance returns false when there are no more elements// sink.cancellationRequested() returns true when short-circuiting is requestedwhile (!cancelled && spliterator.tryAdvance(e -> sink.accept(e))) {cancelled = sink.cancellationRequested();}return cancelled;
    }
    

    这段代码非常清晰地展示了短路机制:

    • while 循环的条件是 !cancelled 并且 spliterator.tryAdvance(...)
    • spliterator.tryAdvance(e -> sink.accept(e)):尝试从 Spliterator 获取一个元素,如果成功,就立即通过 lambda 表达式将其传递给 sink.accept()。如果 Spliterator 中没有更多元素了,tryAdvance 返回 false,循环正常结束。
    • cancelled = sink.cancellationRequested();:在 accept 调用之后,立刻检查 Sink 是否请求取消。如果 limit(5) 的 Sink 已经收到了 5 个元素,它的 cancellationRequested() 就会返回 true
    • cancelled 变量被设为 true 后,下一次 while 循环的条件 !cancelled 就不满足了,循环终止,即使 Spliterator 中还有很多元素。

    调用时机与完整流程

    copyIntoWithCancel 在 copyInto 方法内部被调用,而 copyInto 又是在终端操作的 evaluate 方法中被触发的。

    完整流程示例:Stream.of(1,2,3,4,5).limit(2).forEach(...)

    1. 构建: 创建 AbstractPipeline 链表。
    2. 触发: 调用终端操作 forEach
    3. 包装 SinkforEach 内部调用 wrapSink,将 limit(2) 的逻辑和 forEach 的逻辑包装成一个 wrappedSink
      • limit(2) 的 Sink 会有一个计数器。它的 cancellationRequested() 在计数器达到 2 之前返回 false,达到 2 之后返回 true
    4. 调用 copyIntoforEach 的 evaluate 方法调用 wrapAndCopyInto,后者再调用 copyInto
    5. 进入短路分支copyInto 内部检查到 StreamOpFlag.SHORT_CIRCUIT 标志位(由 limit 操作设置),于是调用 copyIntoWithCancel(wrappedSink, spliterator)
    6. 执行 copyIntoWithCancel:
      • 找到源头阶段。
      • 调用 wrappedSink.begin()
      • 调用源头阶段的 forEachWithCancel
      • forEachWithCancel 开始循环:
        • tryAdvance 获取元素 1,调用 wrappedSink.accept(1)limit 的 Sink 计数器变为 1。cancellationRequested() 返回 false
        • tryAdvance 获取元素 2,调用 wrappedSink.accept(2)limit 的 Sink 计数器变为 2。cancellationRequested() 现在返回 true
        • cancelled 变量被设为 true
        • while 循环终止。
      • forEachWithCancel 返回 true
      • 调用 wrappedSink.end()
      • copyIntoWithCancel 返回 true

    总结

    copyIntoWithCancel 是 Stream API 实现**短路(short-circuiting)**操作的关键枢纽。它通过一个清晰的协议——在推送每个元素后检查 Sink 的取消状态——实现了对数据处理流程的提前终止。

    它本身并不执行遍历,而是负责:

    1. 定位到正确的 Pipeline 实现(源头阶段)。
    2. 遵循 Sink 的 begin/end 协议。
    3. 委托带有取消检查的遍历任务给特定于数据类型的 forEachWithCancel 方法。

    这种设计将框架流程控制(copyIntoWithCancel)与具体遍历逻辑(forEachWithCancel)解耦,是 Stream 内部优雅设计的一个典范。

    为什么通过循环找源头?sourceStage不是保存了吗?

    原因在于代码的演进、分段执行(Pipeline Slicing)的复杂性以及对 depth 和 combinedFlags 字段的重新计算

    让我们来看 sourceSpliterator 这个关键方法,它揭示了更深层次的原因:

    // ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// ... (获取初始 spliterator) ...if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;// ... (省略一些标志位操作) ...spliterator = p.opEvaluateParallelLazy(u, spliterator);// ... (省略一些标志位操作) ...}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}// ... (应用 terminalFlags) ...return spliterator;}
    // ... existing code ...
    

    这个方法揭示了一个非常重要的机制:在并行流遇到有状态操作(Stateful Operation, 如 sorteddistinct)时,流水线会被“切片”执行

    1. 流水线切片(Pipeline Slicing): 当一个并行流包含有状态操作时,执行过程会被分为多个段(Segment)。每个有状态操作都会成为一个段的终点。前一个段会先被完整地执行(opEvaluateParallelLazy),其结果(一个新的 Spliterator)会成为下一个段的输入。

    2. depth 和 combinedFlags 的动态修改: 请注意 sourceSpliterator 方法中的 for 循环。在这个循环里,它会遍历从 sourceStage 到当前阶段 this 之间的所有 stage。当遇到一个有状态操作时,它会:

      • 立即执行到这个有状态操作为止的所有上游操作 (spliterator = p.opEvaluateParallelLazy(u, spliterator);)。
      • 重置 depth (depth = 0;)。这意味着这个有状态操作成为了一个新的“源头”。
      • 重新计算后续阶段的 depth 和 combinedFlags (p.depth = depth++; p.combinedFlags = ...)。

    在调用终端操作时,sourceSpliterator 方法可能会动态地修改流水线中各个阶段的内部状态,特别是 depth 字段。一个原本 depth > 0 的阶段,在经过 sourceSpliterator 的处理后,其 depth 可能变为 0,因为它成了一个新分段的“源头”。

    copyIntoWithCancel 中的那个 循环的真正目的,是为了找到当前执行上下文中的“有效源头”

    • 对于简单的顺序流或无状态并行流,sourceStage 确实就是最终的源头,它的 depth 始终是 0。此时,循环和直接使用 sourceStage 效果一致。
    • 但对于被切片后的有状态并行流,this 所属的那个分段的“源头”可能不再是最初的 sourceStage,而是一个中间的有状态操作阶段。sourceSpliterator 方法已经把那个阶段的 depth 修改为了 0

    因此,while (p.depth > 0) 这个循环是一个健壮的、不依赖于 sourceStage 字段的、根据当前(可能已被修改过的)depth 状态来查找当前分段源头的机制。它从 this 开始,沿着 previousStage 链回溯,直到找到第一个 depth 为 0 的阶段——这才是它当前需要依赖的那个“源头”,并调用它的 forEachWithCancel 方法。

    copyInto 

    它是 Stream 流水线执行的核心驱动方法之一,负责将数据从源头(Spliterator)推送至最终的 Sink

    copyInto 的核心作用是启动并管理整个数据流的处理过程。在 Stream 的懒加载模型中,所有的中间操作都只是构建了一个操作链(AbstractPipeline),并没有实际处理数据。当终端操作被调用时,就需要一个机制来“启动引擎”,让数据真正地流动起来。copyInto 就是这个引擎的启动器。

    它的设计思想是区分对待两种不同类型的流水线

    • 非短路流水线(Non-Short-Circuit): 对于那些需要处理所有元素的操作(如 mapfiltercollect),copyInto 会采用最高效的方式,即调用 spliterator.forEachRemaining(wrappedSink),一次性地将所有数据从 Spliterator 推送到 Sink
    • 短路流水线(Short-Circuit): 对于那些可能提前结束的操作(如 findFirstanyMatchlimit),copyInto 会将任务委托给一个专门的方法 copyIntoWithCancel。这个方法会逐个处理元素,并在每一步检查是否需要提前终止。

    通过这种方式,copyInto 充当了一个分发器(Dispatcher),根据流水线的特性(是否包含短路操作)选择最优的执行策略。


    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator)
    
    • final: 此方法不可被子类重写,是框架的核心稳定部分。
    • <P_IN>: 泛型参数,代表 wrappedSink 能接受的元素类型,也就是数据源 Spliterator 提供的元素类型。
    • void: 这个方法没有返回值。它的职责是执行一个动作(将数据从 spliterator 拷贝到 sink),而不是计算一个结果。最终的结果是由 Sink 自身来构建和持有的。
    • Sink<P_IN> wrappedSink: 经过 wrapSink 方法包装后的最终 Sink。它内部已经融合了流水线上所有中间操作的逻辑。
    • Spliterator<P_IN> spliterator: 数据源的 SpliteratorcopyInto 将从它这里拉取数据。

    代码解析

    // ... existing code ...@Overridefinal <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {Objects.requireNonNull(wrappedSink);if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {wrappedSink.begin(spliterator.getExactSizeIfKnown());spliterator.forEachRemaining(wrappedSink);wrappedSink.end();}else {copyIntoWithCancel(wrappedSink, spliterator);}}
    // ... existing code ...
    
    1. Objects.requireNonNull(wrappedSink);

      • 标准的非空检查,确保下游的 Sink 是有效的。
    2. if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags()))

      • 这是核心的判断逻辑。它检查整个流水线的 combinedFlags 中是否包含 SHORT_CIRCUIT 标志。
      • getStreamAndOpFlags() 返回当前流水线所有操作标志的组合。
      • StreamOpFlag.SHORT_CIRCUIT 是一个标志位,像 limit()findFirst()anyMatch() 等操作会把它加入到流水线的标志中。
      • 如果不包含这个标志,说明这是一个需要处理所有元素的普通流水线,进入 if 分支。
    3. 非短路分支(if 块)

      • wrappedSink.begin(spliterator.getExactSizeIfKnown());
        • 调用 Sink 的 begin 方法,通知它数据即将开始流动。如果源 Spliterator 的大小已知,就传递给 Sink,这有助于 Sink(比如 Collectors.toList())预分配内存,提高效率。
      • spliterator.forEachRemaining(wrappedSink);
        • 这是最高效的数据推送方式。forEachRemaining 会遍历 Spliterator 中所有剩余的元素,并将每个元素传递给 wrappedSink 的 accept 方法。这个调用是阻塞的,直到所有元素处理完毕。
      • wrappedSink.end();
        • 所有元素都处理完后,调用 Sink 的 end 方法,通知它数据流结束,可以进行最后的处理(比如 Collector 的 finisher 操作)。
    4. 短路分支(else 块)

      • else { copyIntoWithCancel(wrappedSink, spliterator); }
      • 如果流水线中包含 SHORT_CIRCUIT 标志,copyInto 不会自己处理,而是将所有参数(wrappedSink 和 spliterator)直接委托给 copyIntoWithCancel 方法。
      • copyIntoWithCancel 内部实现了逐个元素处理并检查取消信号的逻辑,我们之前已经详细分析过它。

    调用时机与完整流程

    copyInto 通常由 wrapAndCopyInto 方法调用,而 wrapAndCopyInto 是终端操作(如 forEachreducecollect)执行其 evaluateSequential 逻辑时的最终步骤。

    完整流程示例:Stream.of(1, 2, 3).map(i -> i * 2).collect(Collectors.toList())

    1. 构建: 创建 AbstractPipeline 链表 (source -> map)。
    2. 触发: 调用终端操作 collect
    3. 执行collect 的 evaluateSequential 方法被调用。
    4. 准备 Sinkcollect 方法会创建一个 Sink,这个 Sink 的逻辑是把元素添加到一个 ArrayList 中。
    5. 调用 wrapAndCopyIntoevaluateSequential 内部调用 wrapAndCopyInto(listSink, spliterator)
    6. 包装 SinkwrapAndCopyInto 首先调用 wrapSink(listSink)wrapSink 会将 listSink 用 map 操作的逻辑包装起来,返回一个 mapSinkmapSink.accept(i) 的效果是 listSink.accept(i * 2)
    7. 调用 copyIntowrapAndCopyInto 接着调用 copyInto(mapSink, spliterator)
    8. 进入非短路分支copyInto 检查流水线标志,发现没有 SHORT_CIRCUIT
      • 调用 mapSink.begin(...)
      • 调用 spliterator.forEachRemaining(mapSink)
        • forEachRemaining 从 spliterator 取出 1,调用 mapSink.accept(1) -> listSink.accept(2)
        • forEachRemaining 从 spliterator 取出 2,调用 mapSink.accept(2) -> listSink.accept(4)
        • forEachRemaining 从 spliterator 取出 3,调用 mapSink.accept(3) -> listSink.accept(6)
      • 调用 mapSink.end()
    9. 返回结果wrapAndCopyInto 返回最初的 listSinkcollect 操作从这个 Sink 中获取最终的 List 结果 [2, 4, 6]

    总结

    copyInto 是连接 Stream 声明式构建命令式执行的关键桥梁。它位于执行路径的核心位置,扮演着一个智能调度员的角色。

    • 它通过检查 SHORT_CIRCUIT 标志,为流水线选择了最优的执行路径。
    • 对于非短路操作,它使用 forEachRemaining 进行高效的批量处理。
    • 对于短路操作,它委托给专门的 copyIntoWithCancel 方法进行精细的、可中断的控制。

    理解 copyInto 的分发逻辑,是理解 Stream 如何根据不同操作的特性来优化其执行过程的重要一步。

    wrapAndCopyInto

    wrapAndCopyInto 的核心作用是封装(Wrap)并执行(CopyInto)。它是许多终端操作(Terminal Operation)在顺序执行(Sequential Execution)时的入口点。

    当一个终端操作(如 reducecollectforEach)被调用时,它提供了一个最终的“目的地”——Sink,这个 Sink 知道如何处理最终的元素并产生结果。但是,数据从源头流到这个最终 Sink 之前,需要经过所有中间操作(如 mapfilter)的处理。

    wrapAndCopyInto 的设计思想就是:

    1. 封装 (Wrap): 接收终端操作提供的原始 Sink,然后从流水线的末端开始,反向用每一个中间操作的逻辑去“包装”这个 Sink,最终形成一个包含了所有操作逻辑的、完整的 wrappedSink
    2. 执行 (CopyInto): 将这个封装好的 wrappedSink 和数据源 Spliterator 交给 copyInto 方法,由 copyInto 启动数据流,将数据从源头一个个推送到 wrappedSink 中进行处理。
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
    
    • <P_IN>: 方法级的泛型参数。代表整个流水线输入端的元素类型,即源头 Spliterator 提供的元素类型。
    • <S extends Sink<E_OUT>>: 方法级的泛型参数。
      • S 代表传入的 sink 参数的具体类型。
      • E_OUT 是 AbstractPipeline 类的泛型参数,代表当前流水线阶段的输出元素类型。
      • 这个约束保证了传入的 sink 必须能消费当前阶段的输出。
    • S sink: 参数,由终端操作提供,是数据流的最终目的地。例如,对于 collect(Collectors.toList()),它就是一个能将元素添加到列表中的 Sink
    • Spliterator<P_IN> spliterator: 参数,数据源的 Spliterator
    • 返回 S: 方法返回传入的那个原始 sink 对象。这一点非常重要,因为它允许调用方在 wrapAndCopyInto 执行完毕后,能从这个 sink 对象中提取最终结果(例如,通过调用 sink.build() 或 sink.get())。

    代码解析

    // ... existing code ...@Overridefinal <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);return sink;}@Override
    // ... existing code ...
    

    这个方法的实现非常简洁,但每一步都至关重要,它将两个核心操作串联了起来:

    1. wrapSink(Objects.requireNonNull(sink)): 这是第一步,即“封装”阶段。

      • Objects.requireNonNull(sink):确保传入的终端 Sink 不为空。
      • wrapSink(...):这是个关键的辅助方法。它会从当前 Pipeline 阶段开始,沿着 previousStage 链反向回溯,直到源头。在回溯的每一步,它都会调用当前阶段的 opWrapSink 方法,将 Sink 用当前操作的逻辑包装一层。
      • 比喻:想象一个俄罗斯套娃。wrapSink 从最里面的小娃娃(终端 Sink)开始,一层层地往外套上更大的娃娃(中间操作的 Sink),最终返回最外层的那个大娃娃。这个大娃娃就是 wrappedSink
    2. copyInto(wrappedSink, spliterator): 这是第二步,即“执行”阶段。

      • 它接收上一步构建好的、包含了所有流水线逻辑的 wrappedSink
      • 然后调用 copyInto 方法,将数据源 spliterator 和 wrappedSink 连接起来,启动数据流。
      • copyInto 内部会判断流水线是否包含短路操作,并选择合适的遍历策略(forEachRemaining 或 copyIntoWithCancel)。
    3. return sink;:

      • 在 copyInto 执行完毕后(意味着所有数据都处理完了),方法返回最初传入的、未被包装的那个 sink
      • 调用者(终端操作)持有这个原始 sink 的引用,现在可以安全地从中获取计算结果了。

    调用时机与完整流程

    wrapAndCopyInto 主要在顺序执行的终端操作中被调用。一个典型的例子是 collect 操作。

    完整流程示例:List.of("a", "b", "c").stream().map(String::toUpperCase).collect(Collectors.toList())

    1. 触发: 调用终端操作 collect
    2. 执行 evaluatecollect 内部会调用 evaluate(terminalOp)
    3. 选择路径evaluate 方法发现是顺序执行 (isParallel() 为 false),于是调用 terminalOp.evaluateSequential(this, sourceSpliterator(...))
    4. evaluateSequentialCollectOpcollect 的实现)的 evaluateSequential 方法会创建一个用于收集元素的 Sink(我们称之为 listSink),然后调用 helper.wrapAndCopyInto(listSink, spliterator)。这里的 helper 就是 map 操作所在的 Pipeline 实例。
    5. 进入 wrapAndCopyInto:
      • wrapSink(listSink) 被调用:
        • wrapSink 发现 map 操作是它的上一个阶段。
        • 它调用 map 操作的 opWrapSink 方法,用 map 的逻辑(s -> s.toUpperCase())包装 listSink,生成一个 mapSink
        • wrapSink 返回这个 mapSink
      • copyInto(mapSink, spliterator) 被调用:
        • copyInto 启动数据流。
        • 它从 spliterator 中取出 "a",交给 mapSink.accept("a")mapSink 内部执行 "A",然后调用 listSink.accept("A")
        • 这个过程对 "b" 和 "c" 重复。
      • 返回 listSinkwrapAndCopyInto 执行完毕,将原始的 listSink 返回给 CollectOp
    6. 获取结果CollectOp 从 listSink 中获取最终的列表 ["A", "B", "C"] 并返回。

    总结

    wrapAndCopyInto 是 Stream API 内部一个设计精巧的执行协调器。它本身不处理任何业务逻辑,而是通过精确地调用 wrapSink 和 copyInto,完美地将操作链的构建数据的实际流动解耦并衔接起来。

    • 职责单一: 它的职责就是“封装然后执行”。
    • 流程清晰wrapSink 负责准备处理器,copyInto 负责输送数据。
    • 结果导向: 通过返回原始 Sink,使得调用方能方便地获取最终结果。

    理解 wrapAndCopyInto 的工作方式,是深入理解 Stream 顺序执行模型的关键。

    evaluateToArrayNode

    evaluateToArrayNode 的核心作用是作为 toArray 操作的专用执行引擎。它封装了将流转换为 Node 的所有逻辑,特别是针对并行流中包含有状态操作(stateful operation)的场景,它实现了一个重要的优化。

    其设计思想是:

    1. 提供统一入口:为 toArray 提供一个比通用 evaluate(TerminalOp) 更直接、更专门的入口点。
    2. 并行优化:识别出一种可以优化的特定并行场景(流水线末端是单个有状态操作),并为其提供一条“快速通道”,避免不必要的数据收集和复制步骤。
    3. 代码复用:在通用场景下,它会委托给更底层的 evaluate(spliterator, flatten, generator) 方法,复用其核心的“物化”逻辑。

    final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
    

    • final: 此方法不可被子类重写,是框架的核心稳定部分。
    • Node<E_OUT>: 返回值。E_OUT 是当前流水线阶段(也就是最后一个阶段)的输出元素类型。返回一个包含了所有流元素的 Node 对象。
    • IntFunction<E_OUT[]> generator: 一个函数,用于创建指定类型的数组,例如 String[]::new。这是 toArray 操作所必需的,因为它需要知道如何创建最终的数组容器。

    代码逻辑深度解析

    // ... existing code ...@SuppressWarnings("unchecked")final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;// If the last intermediate operation is stateful then// evaluate directly to avoid an extra collection stepif (isParallel() && previousStage != null && opIsStateful()) {// Set the depth of this, last, pipeline stage to zero to slice the// pipeline such that this operation will not be included in the// upstream slice and upstream operations will not be included// in this slicedepth = 0;return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);}else {return evaluate(sourceSpliterator(0), true, generator);}}
    // ... existing code ...
    
    if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
    

    这是所有终端操作的标准起始步骤。它确保一个流实例只能被消费一次,然后将 linkedOrConsumed 标志位置为 true,防止后续操作。

    特殊并行路径 (Optimized Parallel Path - if 块)

    这是该方法中最精妙的部分,一个针对特定场景的性能优化。

    if (isParallel() && previousStage != null && opIsStateful()) {// ...
    }
    

    触发条件:

    1. isParallel(): 流必须是并行的。
    2. previousStage != null: 当前流水线至少有两个阶段(源+一个操作)。
    3. opIsStateful()当前(也就是最后一个)中间操作必须是有状态的。例如 sorted()distinct()

    场景示例stream.filter(...).sorted().toArray()。当执行 toArray() 时,sorted() 就是那个“最后一个有状态的中间操作”。

    为什么需要优化?

    常规的并行执行模型遇到有状态操作时,会将其视为一个“屏障”。它会先并行执行屏障之前的所有操作,将结果收集到一个中间 Node 中,然后再对这个中间 Node 执行有状态操作。如果 toArray() 再走一遍这个流程,就会变成:

    并行 filter -> 中间 Node1 -> 并行 sorted -> 中间 Node2 -> toArray 从 Node2 创建数组

    这中间的 Node2 是多余的。我们完全可以直接让 sorted() 操作的结果直接写入最终的数组。

    优化实现:

    1. depth = 0;
      • 这是一个非常巧妙的技巧。通过将当前阶段(toArray 伪阶段)的深度设置为0,它有效地将流水线“切片”。toArray 之前的 sorted() 操作现在被视为一个独立的、完整的流水线。
    2. return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator);
      • opEvaluateParallel 是一个抽象方法,由有状态操作(如 SortedOps)自己实现。
      • 这个调用相当于对 sorted() 操作说:“请你用并行的方式执行,但不要把结果收集到你自己的中间 Node 里,而是直接使用我(toArray)提供的 generator 来创建并填充最终的 Node。”
      • 这样,执行流就变成了: 并行 filter -> 中间 Node1 -> sorted() 直接并行排序并写入最终 Node -> toArray 从最终 Node 创建数组
      • 这就避免了创建 Node2 的开销,减少了一次大规模的数据收集和可能的复制。

    通用路径 (General Path - else 块)

    如果不满足上述特殊并行场景的条件(例如,是顺序流,或者最后一个操作是无状态的如 map),则会进入这个更通用的路径。

    else {return evaluate(sourceSpliterator(0), true, generator);
    }
    

    实现:

    1. sourceSpliterator(0): 获取流的源 Spliterator。参数 0 表示终端操作本身没有附加的标志。
    2. evaluate(..., true, ...): 直接调用我们之前分析过的那个底层的“工兵”方法 evaluate
      • 第一个参数: 数据源 Spliterator
      • 第二个参数 flatten: 设置为 true。这至关重要,它告诉 evaluate 方法,即使在并行计算中产生了 Node 树,也必须在返回前将其“拍平”为一个包含所有元素的、单一的、由数组支持的 Node。这正是 toArray 所需要的结果。
      • 第三个参数 generator: 将 toArray 提供的数组生成器传递下去,供 evaluate 在创建最终 Node 时使用。

    总结

    evaluateToArrayNode 是一个精心设计的、专用于 toArray 的执行方法。它不仅仅是一个简单的包装,其核心价值在于:

    • 识别并优化了一个关键的并行场景:通过“切片”流水线和调用 opEvaluateParallel,它避免了在“有状态操作 + toArray”组合下的重复数据收集,提升了性能。
    • 无缝衔接通用逻辑:在不满足优化条件时,它能平滑地回退到通用的 evaluate 方法,复用其健壮的流物化能力,并确保通过 flatten=true 参数获得 toArray 所需的扁平化 Node 结果。

    这个方法的设计完美体现了在通用框架中嵌入专用优化的思想,是理解 Stream API 高性能实现的一个绝佳范例。

    sourceStageSpliterator

    sourceStageSpliterator() 的核心作用是为流管道的源头阶段(Source Stage)提供一个一次性的、安全的获取其 Spliterator 的方式

    其设计思想是:

    1. 所有权和封装:流的源 Spliterator 是整个管道的生命线。它被封装在源头阶段(sourceStage)中,不能被管道中的任意阶段随意访问。
    2. 单一职责:此方法只做一件事——从源头阶段取出 Spliterator。它不关心并行、有状态操作等复杂的流水线处理逻辑。
    3. 状态强制转换:调用此方法是一个明确的“消费”动作。一旦成功调用,流就被认为是已消费,其内部的 sourceSpliterator 或 sourceSupplier 会被清空,防止重复消费。
    final Spliterator<E_OUT> sourceStageSpliterator()
    
    • final: 此方法不可被子类重写,保证其行为在整个框架中的一致性和稳定性。
    • Spliterator<E_OUT>: 返回值。E_OUT 在这里特指源头阶段的输出类型。它返回流最开始的那个 Spliterator
    • 无参数: 该方法不需要任何外部输入,它的所有操作都基于当前 AbstractPipeline 实例的状态。

    代码逻辑深度解析

    // ... existing code ...final Spliterator<E_OUT> sourceStageSpliterator() {// Ensures that this method is only ever called on the sourceStageif (this != sourceStage)throw new IllegalStateException();if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null;return s;}else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null;return s;}else {throw new IllegalStateException(MSG_CONSUMED);}}
    // ... existing code ...
    

    调用者身份验证

     
    
    // Ensures that this method is only ever called on the sourceStage
    if (this != sourceStage)throw new IllegalStateException();
    

    这是此方法最关键的检查。sourceStage 字段在流水线构建时,会从第一个阶段一直传递到最后一个阶段,它始终指向流水线的源头。

    • this: 代表调用 sourceStageSpliterator() 方法的当前 AbstractPipeline 对象。
    • sourceStage: 指向流管道的第一个阶段(源头)。

    这个检查强制规定:只有源头阶段对象自己才能调用这个方法来获取 Spliterator。管道中的任何中间阶段(如 mapfilter)都无权调用此方法,这保证了数据源的封装性和安全性。

    流状态检查

     
    
    if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
    

    这是标准的流消费检查。

    • if (linkedOrConsumed): 检查流是否已经被操作过或关闭。由于这个方法只能在源头阶段调用,这里的 linkedOrConsumed 实际上检查的是源头阶段的状态。
    • linkedOrConsumed = true: 如果检查通过,立即将流标记为已消费。这是一个原子性的“检查并设置”操作,确保一旦 Spliterator 被取出,流就不能再被用于任何其他操作。

    Spliterator 提取逻辑

    Stream 的源可以由一个 Spliterator 直接提供,也可以由一个 Supplier<Spliterator> 延迟提供。这部分代码处理了这两种情况。

    • Case 1: 直接提供 Spliterator

       
      if (sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSpliterator;sourceSpliterator = null; // 清空引用,防止重复获取return s;
      }
      

      如果 sourceSpliterator 字段不为空,说明流是由一个现成的 Spliterator 创建的。代码会:

      1. 将其强制转换为正确的泛型类型。
      2. 将 sourceSpliterator 字段置为 null。这是关键的“消费”动作,确保这个 Spliterator 实例不会被再次取出。
      3. 返回取出的 Spliterator
    • Case 2: 延迟提供 Spliterator

       
      else if (sourceSupplier != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>)sourceSupplier.get();sourceSupplier = null; // 清空引用,防止重复获取return s;
      }
      

      如果 sourceSupplier 字段不为空,说明流是由一个 Supplier 创建的。代码会:

      1. 调用 sourceSupplier.get() 来生成一个新的 Spliterator 实例
      2. 将 sourceSupplier 字段置为 null。同样是关键的“消费”动作,确保不能通过这个 Supplier 再次生成 Spliterator
      3. 返回新生成的 Spliterator
    • Case 3: 异常情况

       
      else {throw new IllegalStateException(MSG_CONSUMED);
      }
      

      如果 sourceSpliterator 和 sourceSupplier 都为 null,这意味着流已经被消费过了(它们的引用在上次被获取时已经被清空)。此时抛出异常,明确告知调用者流已被消费。


    与 sourceSpliterator(int) 的对比

    AbstractPipeline 中还有一个看起来很像的方法 sourceSpliterator(int terminalFlags)。它们的主要区别在于:

    • sourceStageSpliterator() (本方法):

      • 职责: 简单、纯粹地从源头阶段获取 Spliterator
      • 调用者: 只能是源头阶段自己。
      • 复杂度: 非常简单,不处理任何流水线逻辑。
      • 用途: 主要被 spliterator() 这个终端操作的实现所使用,当用户想把一个未消费的流转换回 Spliterator 时调用。
    • sourceSpliterator(int terminalFlags):

      • 职责: 为任意阶段准备用于终端操作求值的 Spliterator
      • 调用者: 任何一个阶段在准备执行终端操作时都可以调用(通常是最后一个阶段)。
      • 复杂度非常复杂。它包含了处理并行流中有状态操作的核心逻辑。如果流是并行的且包含有状态操作(如 sorted),这个方法会先执行到最后一个有状态操作为止的所有计算,然后返回一个代表中间结果的 Spliterator
      • 用途: 被 evaluate() 和 evaluateToArrayNode() 等所有执行终端操作的方法调用,是实际执行计算的起点。

    简单来说,sourceStageSpliterator() 是一个“档案管理员”,只负责从档案室(源头)取出原始文件(Spliterator)。而 sourceSpliterator(int) 是一个“项目经理”,它不仅能拿到原始文件,还能在需要时组织团队(并行计算)完成一系列加工(有状态操作),最后交付一份处理好的报告(中间结果的 Spliterator)。


    总结

    sourceStageSpliterator() 是一个高度专用的内部方法,其设计目标是正确性安全性。通过严格的调用者检查和状态转换,它确保了流的源数据只能被源头阶段自己、且仅有一次地取出。它与更复杂的 sourceSpliterator(int) 方法形成了职责分工,共同构成了 Stream API 内部数据流转的基石。

    sourceSpliterator

    这个方法是 Stream API 内部实现中最核心、最复杂的方法之一。它不仅仅是获取源 Spliterator,更重要的是,它扮演了启动终端操作前“预处理”流水线的角色,尤其是处理复杂的并行流场景。可以说,理解了这个方法,就理解了 Stream 并行计算的精髓。

    sourceSpliterator(int terminalFlags) 的核心作用是:为终端操作准备一个“就绪”的 Spliterator

    这个“就绪”的 Spliterator 可能是:

    • 对于简单流(顺序流或无状态并行流):就是流的原始 Spliterator
    • 对于复杂流(有状态并行流):是一个代表了部分计算结果的全新 Spliterator

    其设计思想是:

    1. 处理屏障(Barrier):有状态操作(如 sorted()distinct())在并行流中像一个“屏障”,必须等待上游所有数据都到达并处理后,才能继续向下游传递。此方法就是负责执行到最后一个屏障为止的所有计算
    2. 流水线切片与重组:它通过一个循环,动态地“切片”流水线,执行一部分(直到一个有状态操作),然后用其结果(一个新的 Spliterator)作为下一段流水线的输入,并更新后续阶段的元数据(如 depthcombinedFlags)。
    3. 延迟求值(Lazy Evaluation):在处理有状态操作时,它调用 opEvaluateParallelLazy,这个方法返回的 Spliterator 封装了并行计算的结果,但数据本身可能是延迟生成的。

    protected Spliterator<?> sourceSpliterator(int terminalFlags)
    
    • protected: 只能被 java.util.stream 包内或其子类访问,是内部实现细节。
    • Spliterator<?>: 返回一个 Spliterator。注意这里的通配符 ?,因为经过有状态操作后,Spliterator 的类型可能已经改变,但这个方法本身不关心具体类型,只把它当作一个数据源。
    • int terminalFlags: 来自终端操作的标志位(StreamOpFlag)。例如,一个短路操作(like findFirst)会传入 SHORT_CIRCUIT 标志,这个标志会影响后续流水线的行为。

    代码逻辑深度解析

    // ... existing code ...@SuppressWarnings("unchecked")protected Spliterator<?> sourceSpliterator(int terminalFlags) {// Get the source spliterator of the pipelineSpliterator<?> spliterator = null;if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;}else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;}else {throw new IllegalStateException(MSG_CONSUMED);}if (isParallel() && hasAnyStateful()) {// Adapt the source spliterator, evaluating each stateful op// in the pipeline up to and including this pipeline stage.// The depth and flags of each pipeline stage are adjusted accordingly.int depth = 1;for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;u != e;u = p, p = p.nextStage) {int thisOpFlags = p.sourceOrOpFlags;if (p.opIsStateful()) {depth = 0;if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {// ... (omitted for brevity)thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;}spliterator = p.opEvaluateParallelLazy(u, spliterator);// Inject or clear SIZED on the source pipeline stage// based on the stage's spliteratorthisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED: (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED;}p.depth = depth++;p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);}}if (terminalFlags != 0)  {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);}return spliterator;}
    // ... existing code ...
    

    第一步:获取原始 Spliterator

    // Get the source spliterator of the pipeline
    Spliterator<?> spliterator = null;
    if (sourceStage.sourceSpliterator != null) {spliterator = sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;
    }
    else if (sourceStage.sourceSupplier != null) {spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();sourceStage.sourceSupplier = null;
    }
    else {throw new IllegalStateException(MSG_CONSUMED);
    }
    

    这部分逻辑与 sourceStageSpliterator() 中的提取逻辑几乎完全相同。它从源头阶段(sourceStage)获取最原始的 Spliterator,并立即将源头的引用(sourceSpliterator 或 sourceSupplier)清空,标志着流的消费正式开始。

    第二步:处理并行流中的有状态操作(核心 if 和 for 循环)

     
    
    if (isParallel() && hasAnyStateful()) {// ... for loop ...
    }
    

    这是整个方法的核心和灵魂。只有当流是并行的,并且流水线中至少包含一个有状态操作时,这个 if 块才会执行。

    for 循环详解

    for (AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage)

    • 初始化:
      • u (upstream): 上游阶段,初始为 sourceStage
      • p (pipeline): 当前处理的阶段,初始为源的下一个阶段。
      • e (end): 循环的终点,即调用 sourceSpliterator 的那个阶段(通常是最后一个阶段)。
    • 循环条件u != e,只要上游阶段还没到终点就继续。
    • 迭代u = p, p = p.nextStage,两个指针一起向后移动,始终保持 u 是 p 的前一个阶段。

    循环体内部:

    1. if (p.opIsStateful()) { ... }

      • 循环的主要目的就是找到有状态的操作阶段 p
    2. depth = 0;

      • 一旦找到一个有状态操作,就将 depth 计数器重置为0。这是一个信号,表示我们即将创建一个新的“逻辑源头”。
    3. spliterator = p.opEvaluateParallelLazy(u, spliterator);

      • 这是魔法发生的地方。调用当前有状态操作 p 的 opEvaluateParallelLazy 方法。
      • 这个方法会把 u(代表了到目前为止的所有上游操作)和当前的 spliterator(代表了上游的数据源)作为输入。
      • 它会立即触发并执行从上一个屏障(或源头)到当前屏障 p 之间的所有并行计算。
      • 执行完成后,它返回一个新的 Spliterator。这个新的 Spliterator 封装了 p 操作(如 sorted)的计算结果。现在,这个新的 spliterator 变量就成了下一段流水线的“源”。
    4. 更新标志和元数据:

      • thisOpFlags = spliterator.hasCharacteristics(...):根据新生成的 Spliterator 是否 SIZED,来更新当前操作的标志。例如,distinct() 可能会改变流的大小,所以需要重新评估 SIZED 特性。
      • p.depth = depth++;:重新设置后续阶段的深度。因为 p 已经被“执行”并物化为一个新的 Spliterator,所以它的下一个阶段的深度就变成了1。
      • p.combinedFlags = StreamOpFlag.combineOpFlags(...):基于更新后的 thisOpFlags,重新计算后续阶段的组合标志。

    执行示例source.parallel().filter(...).sorted().map(...).toArray() 当 toArray 调用 sourceSpliterator 时:

    1. 循环开始,p 指向 filteropIsStateful() 为 false,跳过 if
    2. 循环继续,p 指向 sortedopIsStateful() 为 true,进入 if
    3. 调用 sorted.opEvaluateParallelLazy(filterHelper, sourceSpliterator)
    4. 这会触发 filter 和 sorted 的并行计算,其结果被封装成一个新的 spliterator_after_sorted
    5. spliterator 变量现在被更新为 spliterator_after_sorted
    6. sorted 之后的 map 阶段的 depth 和 combinedFlags 会被更新,就好像它的上游直接就是 spliterator_after_sorted 一样。
    7. 循环结束。方法返回 spliterator_after_sorted

    最终,toArray 的终端操作拿到的 Spliterator 已经是排好序的数据源了。

     第三步:合并终端操作的标志

     
    
    if (terminalFlags != 0)  {// Apply flags from the terminal operation to last pipeline stagecombinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
    }
    

    最后,将终端操作自己带来的标志(如 SHORT_CIRCUIT)合并到最后一个阶段的 combinedFlags 中。这会影响最终的 wrapAndCopyInto 等方法的行为。


     与 sourceStageSpliterator() 的对比

    • sourceStageSpliterator()档案管理员。职责单一,只负责从源头取出原始 Spliterator。简单、安全、不涉及计算。
    • sourceSpliterator(int)项目经理/总工程师。职责复杂,负责在启动最终任务前,解决掉所有中途的“硬骨头”(有状态并行操作),通过预计算和流水线重组,为最终任务提供一个干净、就绪的数据源。

    总结

    sourceSpliterator(int terminalFlags) 是 Stream API 实现高性能并行计算的引擎室。它通过一个巧妙的循环,将一个复杂的、带有“屏障”的流水线,动态地、分段地执行,并将每一段的计算结果物化为一个新的 Spliterator,作为下一段的输入。这个“评估-重组”的过程,有效地解决了有状态操作在并行环境下的数据依赖问题,是整个 Stream 框架中最为精妙和关键的设计之一。

    wrapSpliterator

    这个方法是 PipelineHelper 接口的实现,它的核心作用是:将一个代表上游数据的 Spliterator,通过当前流水线阶段以及其所有下游阶段的操作进行“包装”,最终返回一个包含了所有后续操作逻辑的新的 Spliterator

    简单来说,它就是 stream.spliterator() 这个终端操作的幕后功臣之一。当你想把一个构建好的、包含多个中间操作的 Stream 再转换回一个 Spliterator 时,就是这个方法在发挥作用。

    wrapSpliterator 的核心作用是将**数据源(sourceSpliterator操作链(从当前阶段到末尾)**结合起来,生成一个全新的、功能完备的 Spliterator

    其设计思想是:

    1. 延迟执行 (Laziness):这个方法本身不执行任何计算。它只是创建一个新的 Spliterator 对象,这个对象内部“知道”需要对源数据执行哪些操作。真正的计算和转换只会在你调用这个新 Spliterator 的 tryAdvance 或 forEachRemaining 等方法时才会发生。
    2. 递归/委托:它不自己实现复杂的包装逻辑,而是委托给一个抽象的 wrap 方法。这个 wrap 方法由具体的 Stream 实现(如 ReferencePipelineIntPipeline)来提供,因为不同类型的 Stream(对象、int、long、double)其 Spliterator 的包装方式也不同。
    3. 流水线封装:返回的 Spliterator 封装了从当前阶段开始的整个下游流水线。对外部调用者来说,它就像一个黑盒,你只管从里面取数据,它会自动帮你完成 mapfilter 等所有操作。

    final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator)
    
    • final: 此方法不可被子类重写,是框架的核心稳定部分。
    • <P_IN>: 泛型参数,代表输入 Spliterator 的元素类型。P_IN 代表 "Pipeline Input"。
    • Spliterator<E_OUT>: 返回值。返回一个新的 Spliterator,其元素类型是 E_OUT,即当前流水线阶段的输出类型。
    • Spliterator<P_IN> sourceSpliterator: 参数,代表上游提供的数据源。

    代码逻辑深度解析

    // ... existing code ...@Override@SuppressWarnings("unchecked")final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) {if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;}else {return wrap(this, () -> sourceSpliterator, isParallel());}}
    // ... existing code ...
    

    基本情况 (Base Case)

    if (depth == 0) {return (Spliterator<E_OUT>) sourceSpliterator;
    }
    
    • depth: 这个字段表示当前阶段在流水线中的深度。depth == 0 是一个特殊标记,它意味着当前 AbstractPipeline 实例就是源头阶段 (Source Stage)
    • 逻辑: 如果当前阶段就是源头,说明它后面没有任何操作了。因此,不需要进行任何包装,直接将输入的 sourceSpliterator 原样返回即可。这里的强制类型转换是安全的,因为在源头阶段,输入类型 P_IN 和输出类型 E_OUT 必然是相同的。
    • 作用: 这是递归包装的终点。

    递归包装 (Recursive Wrapping)

    else {return wrap(this, () -> sourceSpliterator, isParallel());
    }
    

    当 depth > 0 时,意味着当前阶段是一个中间操作阶段(如 mapfilter 等)。这时就需要进行包装。

    • wrap(...): 这是一个抽象方法,定义在 AbstractPipeline 的更下方。

       
      abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph,Supplier<Spliterator<P_IN>> supplier,boolean isParallel);
      

      它由具体的子类(ReferencePipelineIntPipeline 等)实现,因为它们才知道如何创建对应类型的包装 Spliterator(例如,StreamSpliterators.WrappingSpliterator)。

    • wrap 方法的参数:

      1. this: 将当前 AbstractPipeline 实例作为 PipelineHelper 传递进去。这个 helper 对象包含了从当前阶段到最后一个阶段的所有信息(操作链、标志位等)。
      2. () -> sourceSpliterator: 将上游的 sourceSpliterator 包装成一个 Supplier。这是为了支持延迟获取。包装后的 Spliterator 只有在第一次被使用时,才会通过这个 Supplier 真正拿到上游的 Spliterator
      3. isParallel(): 传递当前流的并行状态,以便 wrap 方法创建出正确的(并行或顺序的)Spliterator 包装器。

    执行流程示例: 假设有 stream.map(...).filter(...),我们想从 map 阶段开始获取 Spliterator

    1. 在 map 阶段调用 wrapSpliterator(sourceSpliterator)
    2. map 阶段的 depth > 0,进入 else 分支。
    3. 调用 wrap(mapPipeline, () -> sourceSpliterator, isParallel)
    4. ReferencePipeline 中实现的 wrap 方法会创建一个 StreamSpliterators.WrappingSpliterator
    5. 这个 WrappingSpliterator 内部持有了 mapPipeline 这个 PipelineHelper 和上游的 Spliterator 的 Supplier
    6. 当用户调用这个新 Spliterator 的 tryAdvance 时,它会:
    • a. 从 Supplier 获取上游 Spliterator 并调用其 tryAdvance 得到一个元素。
    • b. 通过 mapPipeline 找到 map 操作的 Sink 包装逻辑。
    • c. 将元素推入 map 的 Sink,再推入 filter 的 Sink
    • d. 如果元素通过了所有操作,就返回 true

    总结

    wrapSpliterator 是连接 Stream 流水线和 Spliterator 终端操作的关键桥梁。它通过一个简洁的 if-else 判断,优雅地处理了两种情况:

    • 当位于流水线源头时,它作为递归的终点,直接返回源 Spliterator
    • 当位于中间操作时,它将**“未来的操作”(由 this PipelineHelper 代表)和“现在的数据源”**(由 sourceSpliterator 代表)委托给具体的 wrap 方法,创建一个新的、懒加载的、功能完备的 Spliterator

    这个方法的设计充分体现了职责分离延迟执行的思想,是 Stream API 能够灵活地在不同表现形式(Stream vs Spliterator)之间转换的核心机制。

    spliterator() 

    这个方法是 BaseStream 接口中定义的一个终端操作。它的作用非常直观:将一个已经构建好(但未消费)的 Stream 流水线转换回一个 Spliterator。这提供了一种机制,允许用户在构建了复杂的流操作后,不立即进行求值,而是获取一个代表了整个流水线逻辑的 Spliterator,以便进行更灵活或自定义的遍历。

    spliterator() 的核心作用是将一个完整的、声明式的 Stream 流水线物化为一个可迭代的数据源 Spliterator

    其设计思想是:

    1. 延迟执行的桥梁:Stream 的中间操作是延迟执行的,终端操作触发计算。spliterator() 是一个特殊的终端操作,它本身不“计算”出最终结果(如 collect),而是将计算逻辑封装到一个新的 Spliterator 中。真正的计算将在遍历这个返回的 Spliterator 时发生。
    2. 互操作性:提供了从 Stream API 到更底层的 Spliterator API 的一个“出口”,允许开发者将 Stream 的强大声明性与 Spliterator 的精细控制(如自定义分割策略)结合起来。
    3. 消费流:和所有终端操作一样,调用 spliterator() 会消费掉这个流,使其不能再被其他操作使用。

    public Spliterator<E_OUT> spliterator()
    
    • public: 这是一个公开的 API,是 BaseStream 接口的一部分,供所有 Stream 用户使用。
    • Spliterator<E_OUT>: 返回一个 SpliteratorE_OUT 是当前流水线阶段(也就是调用 spliterator() 的那个 Stream 对象)的输出元素类型。

    代码逻辑深度解析

     
    
    // ... existing code ...// Primitive specialization use co-variant overrides, hence is not final@Override@SuppressWarnings("unchecked")public Spliterator<E_OUT> spliterator() {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);linkedOrConsumed = true;if (this == sourceStage) {if (sourceStage.sourceSpliterator != null) {@SuppressWarnings("unchecked")Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator;sourceStage.sourceSpliterator = null;return s;}else if (sourceStage.sourceSupplier != null) {@SuppressWarnings("unchecked")Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier;sourceStage.sourceSupplier = null;return lazySpliterator(s);}else {throw new IllegalStateException(MSG_CONSUMED);}}else {return wrap(this, () -> sourceSpliterator(0), isParallel());}}
    // ... existing code ...
    

    首先是标准的状态检查:

     
    
    if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;
    

    这确保了流只能被消费一次,然后将流标记为已消费。

    接下来,代码逻辑分为两大分支:

    情况一:当前阶段是源头 (this == sourceStage)

    这个 if 块处理的是最简单的情况:在一个没有任何中间操作的原始 Stream 上调用 spliterator()。例如 Stream.of(1, 2, 3).spliterator()

    • if (sourceStage.sourceSpliterator != null): 如果流是直接由一个 Spliterator 创建的。

      1. 直接获取这个 Spliterator
      2. 将 sourceStage.sourceSpliterator 置为 null,完成消费。
      3. 返回该 Spliterator
    • else if (sourceStage.sourceSupplier != null): 如果流是由 Supplier<Spliterator> 创建的。

      1. 获取这个 Supplier
      2. 将 sourceStage.sourceSupplier 置为 null,完成消费。
      3. 调用 lazySpliterator(s)。这是一个关键点,它不会立即调用 supplier.get(),而是返回一个特殊的 SpliteratorStreamSpliterators.LazySpliterator),这个 LazySpliterator 只有在第一次被使用(如调用 tryAdvance)时,才会真正从 Supplier 中获取底层的 Spliterator。这保持了懒加载的特性。
    • else: 如果两者都为 null,说明流已被消费,抛出异常。

    情况二:当前阶段是中间操作 (else 块)

    这个 else 块处理的是更复杂、更常见的情况:在一个包含了至少一个中间操作的 Stream 上调用 spliterator()。例如 Stream.of(1, 2, 3).map(i -> i * 2).spliterator()

     
    
    else {return wrap(this, () -> sourceSpliterator(0), isParallel());
    }
    

    这行代码是整个方法的核心,它做了三件事:

    1. () -> sourceSpliterator(0): 创建一个 Supplier。这个 Supplier 的任务是调用我们之前深入分析过的 sourceSpliterator(0) 方法。回忆一下,sourceSpliterator 会负责处理并行流中的有状态操作(屏障),并返回一个“就绪”的 Spliterator 作为后续操作的数据源。对于顺序流,它就简单地返回原始 Spliterator

    2. wrap(this, ..., isParallel()): 调用 wrap 方法。

      • this: 将当前 AbstractPipeline 实例(例如,代表 map 操作的那个对象)作为 PipelineHelper 传入。这个 helper 知道从当前阶段到最后一个阶段的所有操作。
      • () -> sourceSpliterator(0): 将准备好的数据源(通过 Supplier 懒加载)传入。
      • isParallel(): 告知 wrap 方法要创建并行还是顺序的包装器。
    3. 返回结果wrap 方法会返回一个全新的、经过包装的 Spliterator(例如 StreamSpliterators.WrappingSpliterator)。这个 WrappingSpliterator 内部就同时持有了数据源(来自 sourceSpliterator)和操作链(来自 this 这个 PipelineHelper)。当遍历这个新的 Spliterator 时,它会从数据源取出一个元素,然后依次应用流水线上的所有操作。


    与内部 wrapSpliterator 的关系

    spliterator() 的 else 块与我们之前分析的 wrapSpliterator 的 else 块非常相似。

    • spliterator()return wrap(this, () -> sourceSpliterator(0), isParallel());
    • wrapSpliterator(...)return wrap(this, () -> sourceSpliterator, isParallel());

    它们的主要区别在于 wrap 方法的第二个参数,即数据源 Supplier

    • spliterator() 使用 () -> sourceSpliterator(0)。它调用的是复杂的 sourceSpliterator(int) 方法。这是因为它作为终端操作,必须处理整个流水线的复杂性,特别是并行流中的有状态操作。它需要一个能代表“预计算”结果的 Spliterator
    • wrapSpliterator(...) 使用 () -> sourceSpliterator (这里的 sourceSpliterator 是 wrapSpliterator 的参数)。它接收一个已经准备好的上游 Spliterator。它的职责相对简单,只是将这个现成的数据源和下游操作包装起来,不负责处理上游的屏障问题。

    简单说,spliterator() 是面向用户的、发起整个封装过程的入口;而 wrapSpliterator 是内部递归包装过程中的一个环节。


    总结

    public spliterator() 方法是 Stream API 提供的一个功能强大的“逃生舱口”。它允许用户在任意阶段将一个声明式的、懒加载的 Stream 流水线,转换成一个具体的、可迭代的 Spliterator 对象。

    • 对于源 Stream,它直接或懒加载地返回原始的 Spliterator
    • 对于带有中间操作的 Stream,它通过调用核心的 wrap 和 sourceSpliterator 方法,将数据源的获取逻辑(可能包含预计算)操作链的执行逻辑完美地封装到一个新的 Spliterator 中,实现了从 Stream 到 Spliterator 的无缝转换,同时保持了懒加载的特性。

    close

    Stream 需要 close() 和 onClose() 是因为它可能建立在需要显式关闭的底层 I/O 资源之上(例如文件、网络套接字等)。如果不关闭 Stream,这些底层资源可能无法被释放,从而导致资源泄漏。

    BaseStream 接口(StreamIntStream等的父接口)继承了 java.lang.AutoCloseable 接口。这意味着所有 Stream 都可以,并且在某些情况下必须在 try-with-resources 语句中使用,以确保资源的正确释放。

    想象一个典型的场景:逐行读取文件。

    在 Java 8 之前,我们可能会这么写:

    BufferedReader reader = null;
    try {reader = new BufferedReader(new FileReader("file.txt"));String line;while ((line = reader.readLine()) != null) {System.out.println(line);}
    } finally {if (reader != null) {reader.close(); // 必须手动关闭}
    }
    

    使用 Stream API,代码可以变得非常简洁:

     
    
    // 正确的用法
    try (Stream<String> lines = Files.lines(Paths.get("file.txt"))) {lines.forEach(System.out::println);
    } catch (IOException e) {e.printStackTrace();
    }
    // 在 try 块结束时,lines.close() 会被自动调用
    

    Files.lines(...) 方法返回一个 Stream<String>。这个 Stream 的数据源是一个打开的文件句柄。当 Stream 的处理完成时,这个文件句柄必须被关闭。try-with-resources 结构就保证了 stream.close() 方法在最后一定会被调用。

    如果忘记使用 try-with-resources,就可能导致文件句柄一直被占用,直到垃圾回收器介入,这在生产环境中是不可接受的。

    现在我们来看onClose(Runnable closeHandler) 方法。

    它的作用不是执行关闭操作,而是注册一个关闭处理器(Runnable。这个处理器会在 Stream 的 close() 方法被调用时执行。

    这是一个非常精巧的回调机制。我们来分析一下它的源码:

    // ... existing code ...@Override@SuppressWarnings("unchecked")public S onClose(Runnable closeHandler) {if (linkedOrConsumed)throw new IllegalStateException(MSG_STREAM_LINKED);Objects.requireNonNull(closeHandler);Runnable existingHandler = sourceStage.sourceCloseAction;sourceStage.sourceCloseAction =(existingHandler == null)? closeHandler: Streams.composeWithExceptions(existingHandler, closeHandler);return (S) this;}
    // ... existing code ...
    
    1. if (linkedOrConsumed): 检查流是否已经被操作或消费,确保只能在流水线构建阶段注册关闭处理器。
    2. Runnable existingHandler = sourceStage.sourceCloseAction;这是关键。它获取的是源头阶段 (sourceStage) 的关闭处理器。这意味着无论你在流水线的哪个阶段(mapfilter 等)调用 onClose,这个关闭逻辑最终都会被附加到整个流水线的源头。这是完全合理的,因为需要关闭的资源(如文件句柄)正是在源头被打开的。
    3. sourceStage.sourceCloseAction = ...: 它将新的处理器设置回源头阶段。
    4. Streams.composeWithExceptions(existingHandler, closeHandler): 这个逻辑允许你注册多个关闭处理器。如果已经存在一个处理器,它会把新旧两个处理器组合起来,确保它们都会被执行。

    那么,这个注册的 sourceCloseAction 在哪里被调用呢?答案是在 close() 方法里:

    // ... existing code ...@Overridepublic void close() {linkedOrConsumed = true;sourceSupplier = null;sourceSpliterator = null;Runnable closeAction = sourceStage.sourceCloseAction;if (closeAction != null) {sourceStage.sourceCloseAction = null;closeAction.run();}}
    // ... existing code ...
    

    可以看到,close() 方法会从 sourceStage 获取之前注册的 closeAction 并执行它(closeAction.run())。

    设计启示

    • 职责分离onClose 的设计体现了完美的职责分离。
      • 资源创建者(例如 Files.lines 方法)负责提供资源,并使用 onClose 注册清理资源的逻辑。它知道如何关闭资源。
      • 资源使用者(调用 Stream API 的我们)负责在处理完数据后调用 close() 方法(通常通过 try-with-resources),但不需要关心具体的关闭细节。
    • 封装onClose 机制将资源管理的复杂性封装在了 Stream 的实现内部。使用者只需要遵循 AutoCloseable 的标准用法即可,这大大降低了API的复杂度。
    • 灵活性:通过组合 Runnable,该机制甚至支持为一个 Stream 注册多个独立的关闭动作,增加了灵活性。

    因此,onClose() 是一个供 Stream 提供方使用的内部机制,用于将资源清理逻辑与 Stream 本身绑定,而 close() 则是供 Stream 消费方使用的公开接口,用于触发这些清理逻辑。

    核心抽象方法

    这些方法必须由具体的管道实现(如 ReferencePipelineIntPipeline)或其内部的操作类(如 StatelessOpStatefulOp)提供:

    • abstract StreamShape getOutputShape(): 返回当前管道阶段输出的流类型(REFERENCEINT_VALUELONG_VALUEDOUBLE_VALUE)。
    • abstract <P_IN> Sink<P_IN> opWrapSink(int flags, Sink<E_OUT> sink):
      • 这是构建 Sink 链的核心。
      • 参数 sink 是下游操作的 Sink
      • 此方法需要返回一个新的 Sink,该 Sink 实现了当前操作的逻辑,并将结果传递给下游的 sink
      • 例如,一个 map 操作会创建一个 Sink,它对接收到的每个元素应用映射函数,然后将结果传递给下游 Sink
    • abstract <P_IN> Spliterator<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator):
      • (主要由有状态操作实现)定义了有状态操作如何在并行模式下执行,并返回一个新的 Spliterator 包含处理后的元素。
    • abstract boolean opIsStateful(): 返回当前操作是否有状态(例如 sorted()distinct() 是有状态的,而 filter()map() 是无状态的)。

    本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
    如若转载,请注明出处:http://www.pswp.cn/web/91148.shtml
    繁体地址,请注明出处:http://hk.pswp.cn/web/91148.shtml
    英文地址,请注明出处:http://en.pswp.cn/web/91148.shtml

    如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

    相关文章

    《林景媚与命运回响》

    《林景媚与命运回响》——当数据库开始回响命运&#xff0c;现实是否还能被信任&#xff1f;《林景媚数据库宇宙》系列第九部第一章&#xff1a;命运的涟漪公元 2089 年&#xff0c;数据库神谕的运行已趋于稳定&#xff0c;PostgreSQL Quantum Engine&#xff08;PQE&#xff0…

    图神经网络入门:从GNN开始01图卷积网络GCN节点分类 02图注意力网络GAT 03图自编码器GAE 04 门控图神经网络GGNN

    目录 一.基础1-[图论、图算法、CNN] 二.基础2-[图卷积神经网络GCN] 三.torch-geometric.nn工具包安装&#xff08;包含各种算法和数据集&#xff09; 四.GCN任务[节点分类-Cora 数据集] 五.图注意力网络&#xff08;GAT&#xff09; 六.图自编码器&#xff08;GAE&#x…

    001 Configuration结构体构造

    目录DramSys 代码分析1 Configuration结构体构造1.1 from_path 函数详解1.2 构造过程总结这种设计的好处2 Simulator 例化过程2.1 instantiateInitiatorDramSys 代码分析 1 Configuration结构体构造 好的&#xff0c;我们来详细解释一下 DRAMSysConfiguration.cpp 文件中 fro…

    以太坊十年:智能合约与去中心化的崛起

    以太坊10周年&#xff0c;敬开发者&#xff0c;敬构建者&#xff0c;敬还在链上的我们 以太坊即将迎来十周年纪念,作为一名在这个生态中深耕了8到9年的见证者&#xff0c;我亲历了它从一纸白皮书的构想到成长为全球领先去中心化平台的全过程。这十年间&#xff0c;以太坊经历了…

    kafka 3.9.1版本: kraft + sasl+ standlone 模式完整可行安装步骤

    Kafka 3.9.1 Kraft 单机模式安装 安装 OpenJDK 11 CentOS/RHEL yum install -y java-11-openjdk-develUbuntu/Debian apt install -y openjdk-11-jdk下载安装包 wget https://mirrors.aliyun.com/apache/kafka/3.9.1/kafka_2.12-3.9.1.tgz tar -zxvf kafka_2.12-3.9.1.tgz -C /…

    Gitee DevOps平台深度评测:本土化优势与功能特性全面解析

    Gitee DevOps平台深度评测&#xff1a;本土化优势与功能特性全面解析 在数字化转型浪潮下&#xff0c;企业软件开发流程的自动化与协作效率成为核心竞争力。作为国内领先的代码托管与DevOps平台&#xff0c;Gitee&#xff08;码云&#xff09;凭借其本土化服务与全流程支持能力…

    从零开始本地化部署Dify:开源大模型应用平台搭建全指南

    在AI应用开发的浪潮中&#xff0c;Dify作为一款开源的大语言模型(LLM)应用开发平台&#xff0c;正逐渐成为开发者和企业的首选工具。它巧妙地融合了后端即服务&#xff08;BaaS&#xff09;和LLMOps的理念&#xff0c;让开发者能够快速搭建生产级的生成式AI应用。无论是构建智能…

    Qt 多媒体开发:音频与视频处理

    Qt 多媒体模块提供了一套完整的 API&#xff0c;用于开发音频和视频处理应用。从简单的媒体播放到复杂的音视频编辑&#xff0c;Qt 都提供了相应的工具和组件。本文将从基础到高级全面解析 Qt 多媒体开发。 一、Qt 多媒体模块概述 1. 主要组件 Qt 多媒体模块包含以下核心组件&a…

    Mac 专业图像处理 Pixelmator Pro

    原文地址&#xff1a;Pixelmator Pro Mac 专业图像处理 Pixelmator Pro&#xff0c;是一款非常强大、美观且易于使用的图像编辑器&#xff0c;专为 Mac 设计。 采用单窗口界面、基于机器学习的智能图像编辑、自动水平检测&#xff0c;智能快速选择及更好的修复工具等功能优点…

    iptables和IPVS比较

    iptables 和 IPVS (IP Virtual Server) 都是 Linux 系统上用于处理网络流量的强大工具&#xff0c;但它们的设计目标、工作原理和适用场景有显著区别&#xff1a; 核心区别&#xff1a;主要目的&#xff1a; iptables&#xff1a; 核心是一个包过滤防火墙和网络地址转换工具。它…

    语音识别指标计算 WER

    目录 CER&#xff08;Character Error Rate&#xff09; WER Word Error Rate&#xff08;词错误率&#xff09; &#x1f9ee; WER 计算方式 &#x1f4cc; 示例 ✅ 理解要点 CER&#xff08;Character Error Rate&#xff09; 语音识别中的 CER&#xff08;Character …

    【前端基础篇】JavaScript之jQuery介绍

    文章目录前言JQuery基本介绍和使用方法引入依赖jQuery语法jQuery选择器jQuery事件操作元素获取/设置元素内容获取/设置元素属性获取/返回css属性添加元素删除元素总结&#xff1a;常用的jQuery方法 - 详细解释与示例事件处理拓展 - 详细解释与示例其他拓展内容前言 在阅读过程…

    Vue入门:vue项目的创建和基本概念

    一、vue的基本简介1. 什么是vue?Vue (发音为 /vjuː/&#xff0c;类似 view) 是一款用于构建用户界面的 JavaScript 框架。它基于标准 HTML、CSS 和 JavaScript 构建&#xff0c;并提供了一套声明式的、组件化的编程模型&#xff0c;帮助你高效地开发用户界面。无论是简单还是…

    2.oracle保姆级安装教程

    一、Oracle数据库安装1.找到软件的位置 D:\学习软件\Oracle&#xff0c;并解压软件2.双击setup.exe3.选择 是4.去掉勾&#xff0c;下一步5.创建和配置数据库&#xff0c;下一步6.桌面类&#xff0c;下一步7.配置安装路径地址和密码8.点完成9.正在安装&#xff0c;稍等片刻10.有…

    STM32 软件模拟 I2C 读写 MPU6050--实现加速度与陀螺仪数据采集

    演示视频&#xff1a; https://www.bilibili.com/video/BV1iCQRYXEBb/?share_sourcecopy_web&vd_source0e4269581b0bc60d57a80c9a27c98905一、前言在嵌入式开发中&#xff0c;MPU6050 六轴传感器因其集成加速度计和陀螺仪且成本低廉&#xff0c;广泛应用于平衡小车、飞控、…

    TFLOPs与TOPS的转换关系详解:如何衡量AI芯片的算力?

    在评估AI芯片或计算硬件的性能时&#xff0c;我们经常会遇到TFLOPs和TOPS这两个关键指标。很多开发者对它们的区别和转换关系存在疑惑。本文将深入解析这两个指标的含义、应用场景及转换方法&#xff0c;并提供实际应用中的注意事项。 一、基本概念解析 1.1 TFLOPs&#xff08;…

    C语言:第11天笔记

    C语言&#xff1a;第11天笔记 内容提要函数函数的概述函数的分类函数的定义形参和实参函数的返回值函数的调用函数的声明函数 函数的概述 **函数&#xff1a;**实现一定功能的&#xff0c;独立的代码模块&#xff0c;对于函数的使用&#xff0c;一定是先定义&#xff0c;后使 ​…

    java导出pdf(使用html)

    引入maven <dependencies><!-- Thymeleaf --><dependency><groupId>org.thymeleaf</groupId><artifactId>thymeleaf</artifactId><version>3.1.1.RELEASE</version> <!-- 或与 Spring Boot 匹配的版本 --></de…

    Qt 远程过程调用(RPC)实现方案

    在分布式系统开发中&#xff0c;远程过程调用&#xff08;RPC&#xff09;是实现跨进程、跨机器通信的重要技术。Qt 作为一个强大的跨平台框架&#xff0c;提供了多种 RPC 实现方案&#xff0c;能够满足不同场景下的通信需求。本文将深入探讨 Qt 中 RPC 的各种实现方式&#xf…

    攻防世界-引导-Web_php_unserialize

    题目内容&#xff1a;出现一段源代码&#xff0c;分段分析第一部分如下<?php class Demo { private $file index.php;public function __construct($file) { $this->file $file; }function __destruct() { echo highlight_file($this->file, true); }function __w…