压缩怎么进行的

这里的操作都是KValue,内部有row kind,标记了删除和插入

MergeTreeCompactManager 是 Paimon 中 Merge-Tree 结构压缩任务的总调度中心。它的核心职责就是监控文件的层级状态(Levels),并在合适的时机,根据预设的策略,挑选文件并发起压缩任务

triggerCompaction 方法正是这个调度中心的核心入口。下面我们来详细拆解它的逻辑和涉及的策略。

这个方法有两个主要的执行路径,由 fullCompaction 参数控制:

  • fullCompaction = true: 强制执行一次“完全合并”。通常由用户手动触发(比如通过 ALTER TABLE ... COMPACT 命令)。它会尝试将所有数据文件合并成尽可能少的文件。
  • fullCompaction = false: 执行一次“常规合并”。这是系统在日常写入过程中自动触发的,目的是维持 LSM-Tree 的健康结构,防止某一层级文件过多,影响读取性能。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;// 1. 获取当前所有层级的 SortedRun 视图List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 省略日志和检查 ...// 2a. 如果是完全合并,调用 FullCompaction 策略optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {// ... 省略日志和检查 ...// 2b. 如果是常规合并,调用配置的 strategy (如 UniversalCompaction)optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}// 3. 如果策略选出了需要合并的文件单元 (CompactUnit)optionalUnit.ifPresent(unit -> {// 4. 决定是否可以丢弃删除标记 (dropDelete)boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ... 省略日志 ...// 5. 提交压缩任务submitCompaction(unit, dropDelete);});}
// ... existing code ...

完全合并策略 (CompactStrategy.pickFullCompaction)

当 fullCompaction 为 true 时,会调用 CompactStrategy.pickFullCompaction。这个策略非常直接:

  • 目标:将所有层级的所有文件合并到最高层级(maxLevel)。
  • 行为:它会收集 levels 中的所有 DataFileMeta,将它们打包成一个巨大的 CompactUnit,并将 outputLevel 设置为 levels.maxLevel()
  • 应用场景
    • 用户希望整理数据碎片,减少文件总数,优化后续的查询性能。
    • 清理过期数据(如果配置了 recordLevelExpire)。因为只有在最高层级的合并才能确保数据不会再被旧版本覆盖,从而安全地物理删除。

常规合并策略 (strategy.pick)

当 fullCompaction 为 false 时,会调用构造函数中传入的 strategy 实例的 pick 方法。在 Paimon 中,最常用的常规策略是 UniversalCompaction

UniversalCompaction 策略模拟了 RocksDB 的 Universal Compaction Style,其核心思想是:

  • 目标:维持一个健康的、层级分明的文件结构,避免 Level 0 文件堆积过多,同时控制写放大和空间放大。
  • 触发条件
    1. Sorted Run 数量阈值:当总的 SortedRun 数量(Level 0 每个文件算一个 run,其他 level 每层算一个 run)超过 num-sorted-run.compaction-trigger 配置时,会触发合并。这是最主要的触发条件。
    2. 空间放大阈值:当所有文件的总大小,远大于最高层级文件的大小时(超过 max-size-amplification-percent 配置的比例),也会触发合并,以回收空间。
  • 挑选文件的逻辑
    • 它会从 Level 0 开始,向上检查,找到第一个满足合并条件的层级。
    • 通常,它会选择将 Level 0 的所有文件合并到 Level 1,或者将 Level i 的所有文件合并到 Level i+1
    • 它会尽量选择相邻的、大小相似的 SortedRun 进行合并,以达到最优效率。
  • filter 的作用: 在 triggerCompaction 方法中,strategy.pick 的结果后面跟了两个 .filter(...) 调用。
    1. .filter(unit -> unit.files().size() > 0): 确保选出的压缩单元不是空的。
    2. .filter(unit -> unit.files().size() > 1 || unit.files().get(0).level() != unit.outputLevel()): 这是一个重要的优化。它排除了“只有一个文件,并且输出层级和当前层级相同”的情况。这种情况意味着文件只是被“升级”(upgrade),而没有实际的合并重写,这通常在 MergeTreeCompactTask 内部处理更高效,无需作为一个独立的压缩任务提交。

dropDelete 策略

在确定了要合并的文件后,代码会计算一个 dropDelete 布尔值。

  • 含义dropDelete 为 true 表示在合并重写数据时,可以直接丢弃掉类型为 DELETE 的记录。
  • 条件
    1. unit.outputLevel() != 0: 输出层级不能是 Level 0。因为 Level 0 可能还有其他更老的文件没有参与本次合并,如果丢弃了删除标记,可能会导致本该被删除的数据重新“复活”。
    2. unit.outputLevel() >= levels.nonEmptyHighestLevel(): 输出层级必须是当前数据存在的最高层级或更高。这确保了合并产生的新文件是“最老”的,不存在比它更老的数据了,因此删除标记可以被安全地物理移除。
    3. dvMaintainer != null: 如果表开启了删除向量(Deletion Vectors),则逻辑会有所不同,通常也可以安全地处理删除。

总结

MergeTreeCompactManager 的 triggerCompaction 方法是一个精密的调度器,它通过 levels 对象感知整个表的物理文件状态,并执行以下流程:

  1. 获取状态:从 levels 中获取所有文件的层级和分布信息(levelSortedRuns)。
  2. 策略决策
    • 如果是手动触发的全量合并,则使用 pickFullCompaction 策略,将所有文件打包,目标是合并到最高层。
    • 如果是自动触发的常规合并,则使用 UniversalCompaction 等策略,根据 SortedRun 数量、空间放大等指标,智能地选择一部分文件进行层级推进式的合并。
  3. 任务优化:通过 filter 过滤掉无需执行的或无效的压缩单元。
  4. 参数计算:根据合并的目标层级和 levels 的整体状态,计算出是否可以安全地在合并中物理删除数据(dropDelete)。
  5. 提交执行:最后将包含待合并文件、目标层级、dropDelete 标志的 CompactUnit 封装成 MergeTreeCompactTask,提交到线程池执行。

整个过程完美地体现了 LSM-Tree 架构通过后台合并来平衡写入性能和查询性能的核心思想。

压缩策略

CompactStrategy.pickFullCompaction - 全量合并策略

这是一个定义在 CompactStrategy 接口中的静态方法,代表了一种最彻底的合并策略。它的目标非常明确:将表(或分区)中的所有数据文件合并成尽可能少的文件,并放置到最高层级(max level)

a. 源码分析

// ... existing code ...static Optional<CompactUnit> pickFullCompaction(int numLevels,List<LevelSortedRun> runs,@Nullable RecordLevelExpire recordLevelExpire) {int maxLevel = numLevels - 1;if (runs.isEmpty()) {// no sorted run, no need to compactreturn Optional.empty();} else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {if (recordLevelExpire == null) {// only 1 sorted run on the max level and don't check record-expire, no need to// compactreturn Optional.empty();}// pick the files which has expired recordsList<DataFileMeta> filesContainExpireRecords = new ArrayList<>();for (DataFileMeta file : runs.get(0).run().files()) {if (recordLevelExpire.isExpireFile(file)) {filesContainExpireRecords.add(file);}}return Optional.of(CompactUnit.fromFiles(maxLevel, filesContainExpireRecords));} else {return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}
// ... existing code ...

逻辑分解如下:

  1. 计算最高层级maxLevel = numLevels - 1
  2. 处理边界情况
    • 如果 runs 为空(没有任何文件),直接返回 Optional.empty(),无需合并。
    • 如果已经只剩下一个 SortedRun 并且它已经在最高层级 (runs.size() == 1 && runs.get(0).level() == maxLevel),这说明数据已经是最优状态了。
      • 此时,会检查是否配置了记录级过期 (recordLevelExpire)
      • 如果没有配置过期,那么就无需做任何事,返回 Optional.empty()
      • 如果配置了过期,它会遍历这个 SortedRun 中的所有文件,挑出那些可能包含过期数据的文件(通过 recordLevelExpire.isExpireFile(file) 判断),然后只对这些文件进行一次“自我合并”以清理过期数据。
  3. 常规全量合并
    • 如果不满足上述边界情况(比如有多于一个 SortedRun,或者唯一的 SortedRun 不在最高层),则执行标准的全量合并。
    • 它会调用 CompactUnit.fromLevelRuns(maxLevel, runs),将所有传入的 runs 中的文件都包含进来,创建一个 CompactUnit,并指定输出层级为 maxLevel

b. 应用场景

  • 用户手动触发:最常见的场景是用户通过 Flink SQL CALL sys.compact(...) 或 Spark Procedure CALL sys.compact(...) 并指定 full 模式来执行。
  • 目的
    • 减少小文件:将长时间积累的大量小文件合并成少量大文件,显著提升后续的查询性能。
    • 物理删除:全量合并到最高层级是安全地物理删除带有删除标记(DELETE)的数据的唯一时机。
    • 数据清理:配合 TTL,清理过期数据。

 ForceUpLevel0Compaction - 强制提升 Level 0 策略

这是一个实现了 CompactStrategy 接口的类,它代表了一种更激进的常规合并策略。它的核心思想是:优先采用通用的 UniversalCompaction 策略;如果通用策略认为无需合并,则强制检查 Level 0 是否有文件,如果有,就将它们全部合并

a. 源码分析

/** A {@link CompactStrategy} to force compacting level 0 files. */
public class ForceUpLevel0Compaction implements CompactStrategy {private final UniversalCompaction universal;public ForceUpLevel0Compaction(UniversalCompaction universal) {this.universal = universal;}@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {// 1. 首先尝试通用的 UniversalCompaction 策略Optional<CompactUnit> pick = universal.pick(numLevels, runs);if (pick.isPresent()) {// 如果通用策略找到了需要合并的文件,就直接采纳它的决定return pick;}// 2. 如果通用策略认为不需要合并,则执行强制逻辑//    调用 universal.forcePickL0,这个方法会专门检查 L0return universal.forcePickL0(numLevels, runs);}
}

forcePickL0 的逻辑在 UniversalCompaction.java 中:

// ... existing code ...Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {// 收集所有 level 0 的文件int candidateCount = 0;for (int i = candidateCount; i < runs.size(); i++) {if (runs.get(i).level() > 0) {break;}candidateCount++;}// 如果 L0 没有文件,返回空;否则,将所有 L0 文件打包成一个压缩单元return candidateCount == 0? Optional.empty(): Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));}
// ... existing code ...

逻辑分解如下:

  1. 委托与回退(Delegate and Fallback)ForceUpLevel0Compaction 首先将决策权委托给内部的 UniversalCompaction 实例。UniversalCompaction 会根据文件数量、大小比例等常规指标判断是否需要合并。
  2. 强制检查 L0:如果 UniversalCompaction 的常规检查没有触发合并(返回了 Optional.empty()),ForceUpLevel0Compaction 会执行它的“强制”逻辑:调用 universal.forcePickL0
  3. forcePickL0 的行为:这个方法很简单,它只看 Level 0。只要 Level 0 存在任何文件,它就会把 Level 0 的所有文件都收集起来,创建一个 CompactUnit,准备将它们向上合并。

b. 应用场景

  • Lookup Join 优化:这是此策略最主要的应用场景。当 Paimon 表作为 Flink 的维表进行 lookup join 时,为了保证维表数据的近实时性,我们希望新写入的数据(总是在 Level 0)能尽快地被合并到更高层级,从而对 lookup 可见。
  • lookup.compact-mode = 'radical':当用户配置此参数时,系统就会启用 ForceUpLevel0Compaction 策略。radical(激进的)这个词很形象地描述了它的行为:只要有新数据写入(即 L0 有文件),就尽快地、激进地将它合并掉,以牺牲一些合并开销为代价,换取数据可见性的延迟降低。

Paimon 的优化:Lookup 模式下跳过 L0

为了解决上述性能问题,Paimon 在开启了特定优化后(比如配置了 lookup changelog-producer 或开启了删除向量),其 LocalTableQuery 会采取一种特殊的读取策略:

默认从 Level 1 开始读取数据,直接忽略 Level 0!

这可以在 LocalTableQuery.java 的构造函数中找到证据:

// ... existing code ...this.lookupStoreFactory =LookupStoreFactory.create(
// ... existing code ...new RowCompactedSerializer(keyType).createSliceComparator());if (options.needLookup()) {// 关键点:如果开启了 lookup 优化,起始读取层级设置为 1startLevel = 1;} else {
// ... existing code ...}
// ... existing code ...

options.needLookup() 会在满足某些条件时(如 changelog-producer = 'lookup')返回 true。当它为 true 时,startLevel 被设为 1,这意味着后续的所有查找操作都将从 Level 1 开始,从而完全避开了低效的 Level 0。

ForceUpLevel0Compaction 的作用:让 L0 数据尽快可见

现在,整个逻辑就闭环了:

  1. 新数据写入 Paimon 表,生成 L0 文件。
  2. 高性能的 Lookup Join 读取器为了性能,只看 L1 及以上层级的文件
  3. 此时,新写入的 L0 文件对于这个 Lookup Join 来说,就是“不可见”的。
  4. 为了解决这个问题,必须尽快地将 L0 的文件合并(Compaction)到 L1。
  5. ForceUpLevel0Compaction 策略应运而生。它的行为非常“激进” (radical):只要发现 L0 有文件,不管满足不满足常规的合并触发条件,都强制发起一次 Compaction,将它们推向 L1

这样一来,新数据停留在 L0 的时间窗口被大大缩短,从而保证了数据能够近实时地对 Lookup Join 可见。

UniversalCompaction 

它是 Paimon 中最核心、最常用的常规压缩策略,其设计思想借鉴了 RocksDB 的 Universal Compaction,旨在平衡写入放大、读取放大和空间放大。

UniversalCompaction 作为一个 CompactStrategy 的实现,它的主要职责是在常规写入流程中,根据一系列预设规则,判断是否需要触发一次合并(Compaction),并挑选出具体要合并哪些文件。

它的核心目标是:

  • 控制写入放大(Write Amplification):避免过于频繁地重写数据。
  • 维持健康的 LSM-Tree 结构:防止 L0 文件过多,或者文件大小差异过大,从而保证读取性能(Read Amplification)和空间占用(Space Amplification)在一个可接受的范围内。

我们先从它的成员变量入手,这些变量定义了 UniversalCompaction 策略的行为准则。

// ... existing code ...
public class UniversalCompaction implements CompactStrategy {private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class);// 对应 'compaction.max-size-amplification-percent'private final int maxSizeAmp; // 对应 'compaction.sorted-run.size-ratio'private final int sizeRatio;// 对应 'compaction.sorted-run.num-compaction-trigger'private final int numRunCompactionTrigger;// 对应 'compaction.optimization-interval'@Nullable private final Long opCompactionInterval;@Nullable private Long lastOptimizedCompaction;// 对应 'lookup.compact-max-interval'@Nullable private final Integer maxLookupCompactInterval;@Nullable private final AtomicInteger lookupCompactTriggerCount;public UniversalCompaction(int maxSizeAmp,int sizeRatio,int numRunCompactionTrigger,@Nullable Duration opCompactionInterval,@Nullable Integer maxLookupCompactInterval) {this.maxSizeAmp = maxSizeAmp;this.sizeRatio = sizeRatio;this.numRunCompactionTrigger = numRunCompactionTrigger;this.opCompactionInterval =opCompactionInterval == null ? null : opCompactionInterval.toMillis();this.maxLookupCompactInterval = maxLookupCompactInterval;this.lookupCompactTriggerCount =maxLookupCompactInterval == null ? null : new AtomicInteger(0);}
// ... existing code ...
  • maxSizeAmp最大空间放大百分比。控制除最老的一个 Sorted Run 外,其他所有 Sorted Run 的总大小,不能超过最老的 Sorted Run 大小的 maxSizeAmp / 100 倍。这是为了控制空间浪费。
  • sizeRatio大小比例。在挑选文件进行合并时,如果当前候选文件集合的总大小,与下一个 Sorted Run 的大小比例在 sizeRatio 之内,就会把下一个 Sorted Run 也纳入本次合并。这是为了倾向于合并大小相近的文件,提高效率。
  • numRunCompactionTrigger文件数量触发器。当总的 Sorted Run 数量超过这个阈值时,会强制触发一次合并。这是最主要的常规合并触发条件。
  • opCompactionInterval优化合并时间间隔。一个可选的配置,用于周期性地触发一次全量合并(将所有文件合并到最高层),以保证读取性能。
  • maxLookupCompactIntervalLookup 场景的合并间隔。这是为 lookup.compact-mode = 'gentle' 模式设计的。它不要求每次写入都强制合并 L0,而是每隔 N 次 pick 调用(即 N 次 compaction 检查)后,如果 L0 还有文件,就强制合并一次。这是一种在性能和数据延迟之间的折中。

核心方法 pick - 决策中心

pick 方法是整个策略的核心,它按照优先级从高到低的顺序,检查是否满足各种触发条件。只要有一个条件满足,就会生成一个 CompactUnit 并返回,后续的检查就不再进行。

// ... existing code ...@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {int maxLevel = numLevels - 1;// 优先级 0: 周期性优化合并 (如果配置了)if (opCompactionInterval != null) {if (lastOptimizedCompaction == null|| currentTimeMillis() - lastOptimizedCompaction > opCompactionInterval) {// ...return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}// 优先级 1: 检查空间放大 (pickForSizeAmp)CompactUnit unit = pickForSizeAmp(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 优先级 2: 检查大小比例 (pickForSizeRatio)unit = pickForSizeRatio(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 优先级 3: 检查文件数量 (numRunCompactionTrigger)if (runs.size() > numRunCompactionTrigger) {// ...return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));}// 优先级 4: 检查 Lookup 场景的周期性强制合并 (如果配置了)if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {lookupCompactTriggerCount.getAndIncrement();if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {// ...return forcePickL0(numLevels, runs);} // ...}return Optional.empty();}
// ... existing code ...

触发条件分析 (按优先级)

  1. 周期性优化合并 (opCompactionInterval): 最高优先级。如果配置了该参数,并且距离上次优化合并的时间已经超过了指定间隔,它会直接触发一次全量合并,将所有 runs 合并到 maxLevel。这对于需要定期整理数据以保证查询性能的场景非常有用。

  2. 空间放大 (pickForSizeAmp): 检查 (所有文件总大小 - 最老文件大小) / 最老文件大小 是否超过了 maxSizeAmp。如果超过,说明非最高层的数据(即“增量”数据)相对于“存量”数据来说过于庞大,占用了过多额外空间。此时也会触发一次全量合并

  3. 大小比例 (pickForSizeRatio): 这是最常规的合并挑选逻辑。它会从最年轻的文件(runs 列表的开头)开始,逐个向后累加,只要当前累加的总大小与下一个文件的大小比例在 sizeRatio 之内,就继续向后吞并。这个过程会形成一个大小比较均匀的合并候选集。

  4. 文件数量 (numRunCompactionTrigger): 如果总的 SortedRun 数量超过了阈值,说明文件过于碎片化,会严重影响性能。此时会强制触发一次合并。它会计算出需要合并掉多少个文件才能使总数降到阈值以下,然后调用 pickForSizeRatio 来挑选这些文件。

  5. Lookup 周期性合并 (maxLookupCompactInterval): 这是最低优先级的检查。它维护一个原子计数器 lookupCompactTriggerCount。每次调用 pick 都会使其加一。当计数器达到 maxLookupCompactInterval 时,就会重置为 0 并调用 forcePickL0,强制合并 L0 的所有文件。这为 lookup 场景提供了一种“温和”(gentle)的合并模式。

pickForSizeRatio 

    public CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {long candidateSize = candidateSize(runs, candidateCount);for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;}if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);}return null;}

这个方法是 Paimon 中 Universal Compaction(通用合并)策略的核心部分之一,它根据文件大小比例来决定哪些文件(SortedRun)应该被合并。

  • int maxLevel: Merge-Tree 的最大层级数。合并后的文件通常会被放到更高的层级。
  • List<LevelSortedRun> runs: 当前所有层级的、已排序的 SortedRun 列表。这个列表通常是按从新到旧(或从小到大)的顺序排列的。
  • int candidateCount: 初始候选 SortedRun 的数量。方法会从列表的前 candidateCount 个 run 开始考虑合并。
  • boolean forcePick: 是否强制选择。如果为 true,即使最终只选出了一个 run,也会创建合并单元(CompactUnit)。这在某些强制合并的场景下很有用。

这个方法的主要逻辑是:从一组初始的候选文件(runs)开始,不断地尝试将后续的文件也加入到这次合并任务中,直到遇到一个尺寸“过大”的文件为止。

代码执行流程如下:

  1. 计算初始候选文件总大小

    long candidateSize = candidateSize(runs, candidateCount);
    

    首先,它会计算由 candidateCount 指定的初始候选 run 的总大小。

  2. 迭代选择更多文件

    for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;
    }
    

    接着,它会遍历剩余的 run。对于每一个 next run,它会检查一个关键条件: candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()

    • sizeRatio 是一个配置项 (compaction.size-ratio),表示尺寸比较时的灵活性百分比。
    • 这个条件判断的是:当前已选中的文件总大小(candidateSize),即使加上 sizeRatio 百分比的“宽容度”,是否仍然小于下一个文件(next)的大小。
    • 如果条件成立,意味着下一个文件 next 比当前已选中的文件总和要大得多,将它们合并的性价比不高。因此,循环中断(break),不再选择更多的文件。
    • 如果条件不成立,意味着 next 文件的大小和当前已选中的文件总大小在同一个量级,适合一起合并。于是,将 next 文件加入候选集,更新 candidateSize 和 candidateCount
  3. 创建合并单元

    if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);
    }return null;
    

    循环结束后,方法会判断是否需要创建合并任务 (CompactUnit)。

    • 如果 forcePick 为 true,或者最终选出的文件数量 candidateCount 大于1(合并至少需要两个文件才有意义),就会调用 createUnit 方法来创建一个包含所有选中文件的 CompactUnit
    • 否则,说明没有找到合适的合并机会,返回 null

在 UniversalCompaction 类中,这个方法主要在以下几个场景被调用:

  1. 常规大小比例检查 (pickForSizeRatio(maxLevel, runs)): 这是最常见的用法,从第一个 run 开始(candidateCount=1),尝试寻找合适的合并机会。
  2. 文件数量触发 (pick 方法中): 当总文件数超过阈值 numRunCompactionTrigger 时,会触发合并。此时,初始 candidateCount 会被设置为 runs.size() - numRunCompactionTrigger + 1,然后调用此方法来决定最终合并哪些文件。
  3. 强制L0层合并 (forcePickL0 方法中): 在某些情况下(例如,为了降低查找延迟),需要强制合并L0层的所有文件。这时会调用此方法,并将 forcePick 设置为 true,确保即使L0只有一个文件也会被打包成一个合并任务。

pickForSizeRatio 方法实现了一种智能的合并文件选择策略。它倾向于将大小相近的文件进行合并,避免了将一个很小的文件和一个巨大的文件进行合并所带来的高I/O开销,从而提高了合并效率。通过 sizeRatio 参数提供了灵活性,并通过 forcePick 参数支持了强制合并的场景。

createUnit() 方法:目标层级的决定者

createUnit 方法是真正计算 outputLevel 的地方,这是理解整个机制的关键

// ... existing code ...@VisibleForTestingCompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {int outputLevel;if (runCount == runs.size()) {// 如果所有 run 都参与合并,目标就是最高层outputLevel = maxLevel;} else {// 否则,目标层级是下一个未参与合并的 run 的层级减 1// 这是最核心的逻辑outputLevel = Math.max(0, runs.get(runCount).level() - 1);}if (outputLevel == 0) {// 为了避免产生新的 level 0 文件,这里做了特殊处理// 如果计算出的目标是 level 0,会继续往后寻找,直到找到一个非 level 0 的 run// 并将它的层级作为 outputLevelfor (int i = runCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);runCount++;if (next.level() != 0) {outputLevel = next.level();break;}}}if (runCount == runs.size()) {// 如果经过上述逻辑后,所有 run 都被选中了,那么还是合并到最高层updateLastOptimizedCompaction();outputLevel = maxLevel;}return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));}
// ... existing code ...

核心逻辑解读:

  • runCount 是被选中参与本次合并的 run 的数量。
  • outputLevel = Math.max(0, runs.get(runCount).level() - 1); 这一行代码是关键。它说明,当不是所有文件都参与合并时,输出层级取决于第一个未被选中的 run (runs.get(runCount))。目标层级是这个 run 的层级减 1。

举个例子: 假设我们有以下 runs: [L0, L0, L0, L2, L4]

  1. pickForSizeRatio 方法可能决定合并前3个 L0 的文件。此时 runCount = 3。
  2. 进入 createUnit 方法,runs.get(runCount) 就是 runs.get(3),即那个 L2 的 run。
  3. outputLevel 计算为 L2.level() - 1,也就是 2 - 1 = 1
  4. 最终,这3个 L0 文件会被合并成一个新的 L1 文件,而不是 maxLevel

辅助方法

  • pickForSizeRatio(...): 实现了上述的大小比例挑选逻辑。它会从最年轻的文件开始,向后“滚动”合并,直到下一个文件太大不适合合并为止。
  • createUnit(...): 根据挑选出的文件(runCount个),决定输出层级(outputLevel)。逻辑通常是:如果合并了所有文件,则输出到最高层;否则,输出到下一个未被合并的文件的层级减一。它还会保证输出层级不为 0。
  • forcePickL0(...): 一个特殊的方法,只收集 Level 0 的所有文件,并准备将它们合并。被 ForceUpLevel0Compaction 和 lookupCompactMaxInterval 逻辑所调用。

总结

UniversalCompaction 是一个设计精巧且全面的合并策略,它通过多维度、分优先级的规则来维护 LSM-Tree 的健康状态:

  • 通过文件数量空间放大来设定合并的“底线”,防止系统状态恶化。
  • 通过大小比例来智能地挑选合并单元,实现高效合并。
  • 通过周期性全量合并针对 Lookup 的周期性 L0 合并,为不同的应用场景提供了额外的优化选项。

它与 ForceUpLevel0Compaction 的关系是:ForceUpLevel0Compaction 是一种更激进的策略,它完全复用了 UniversalCompaction 的所有逻辑,并在此基础上增加了一个最终的回退(fallback)逻辑:如果 UniversalCompaction 认为无需合并,它会强制检查并合并 L0,以满足 Lookup 场景对数据实时性的极致要求。

CompactUnit

Paimon 中执行一次具体压缩操作的对象是 CompactUnit,它并不完全等同于一个 SortedRun,而是从一个或多个 SortedRun 中选取出来的文件集合。定位哪些文件需要压缩,则是由 CompactStrategy(压缩策略)来决encided。

一个压缩任务的基本工作单元是 CompactUnit 接口。从它的定义可以看出,一个 CompactUnit 主要包含两部分信息:

  1. outputLevel(): 压缩后生成的新文件要归属的层级(Level)。
  2. files(): 需要参与本次压缩的所有数据文件(DataFileMeta)的列表。
public interface CompactUnit {int outputLevel();List<DataFileMeta> files();static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {List<DataFileMeta> files = new ArrayList<>();for (LevelSortedRun run : runs) {files.addAll(run.run().files());}return fromFiles(outputLevel, files);}static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {return new CompactUnit() {@Overridepublic int outputLevel() {return outputLevel;}@Overridepublic List<DataFileMeta> files() {return files;}};}
}

所以,压缩对象是一组文件的集合(List<DataFileMeta>),而不是单个 SortedRun。一个 CompactUnit 可以包含来自不同 SortedRun 的文件。

SortedRun 是 LSM-Tree 里的一个逻辑概念,代表一个有序的数据文件集合。在 Paimon 中,Level 0 每个文件都是一个独立的 SortedRun,而在其他 Level,同一层的所有文件构成一个 SortedRun

SortedRun 是挑选压缩文件时的重要依据,但不是压缩任务的直接执行对象。压缩策略会分析各个 SortedRun 的状态(如数量、大小、层级)来决定是否要发起一次压缩,以及挑选哪些 SortedRun 里的文件来组成 CompactUnit

如何定位和组织压缩对象?

这个过程主要分为两步:策略选择 和 任务执行

第一步:CompactStrategy 策略选择

CompactStrategy 接口负责从当前所有 LevelSortedRun(带有层级信息的 SortedRun)中挑选出需要被压缩的 CompactUnit

// ... existing code ...
public interface CompactStrategy {/*** Pick compaction unit from runs.** <ul>*   <li>compaction is runs-based, not file-based.*   <li>level 0 is special, one run per file; all other levels are one run per level.*   <li>compaction is sequential from small level to large level.* </ul>*/Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);/** Pick a compaction unit consisting of all existing files. */
// ... existing code ...

例如,UniversalCompaction 策略会检查是否有太多 SortedRun,如果超过阈值,就会选择一些相邻的 SortedRun 合并。FullCompaction 策略则会选择所有的文件进行合并。

第二步:MergeTreeCompactTask 任务执行

一旦 CompactStrategy 挑选出了一个 CompactUnit,就会创建一个 MergeTreeCompactTask 来执行具体的压缩逻辑。在你正在查看的 MergeTreeCompactTask.java 文件中,doCompact() 方法清晰地展示了如何处理这个 CompactUnit 里的文件。

  1. 分区(Partitioning):任务首先会根据文件的 key 范围将 CompactUnit 中的文件分成若干个互不重叠或连续重叠的组(List<List<SortedRun>> partitioned)。
  2. 分类处理
    • 对于有多个 SortedRun 重叠的组,它们是必须被重写(rewrite)合并的,因此被加入 candidate 列表。
    • 对于没有重叠的组(只有一个 SortedRun),会进一步判断:
      • 如果文件很小(小于 minFileSize),它也会被加入 candidate 列表,以便和其它小文件一起合并,避免产生过多小文件。
      • 如果文件很大,通常会直接“升级”(upgrade),即只修改文件的元数据,将其层级提升到 outputLevel,而无需重写文件内容,这样效率更高。但如果这个大文件包含需要过期的数据,或者需要被合并到最高层级但自身带有删除记录,它还是会被强制重写。
// ... existing code ...@Overrideprotected CompactResult doCompact() throws Exception {List<List<SortedRun>> candidate = new ArrayList<>();CompactResult result = new CompactResult();// Checking the order and compacting adjacent and contiguous files// Note: can't skip an intermediate file to compact, this will destroy the overall// orderlinessfor (List<SortedRun> section : partitioned) {if (section.size() > 1) {candidate.add(section);} else {SortedRun run = section.get(0);// No overlapping:// We can just upgrade the large file and just change the level instead of// rewriting it// But for small files, we will try to compact itfor (DataFileMeta file : run.files()) {if (file.fileSize() < minFileSize) {// Smaller files are rewritten along with the previous filescandidate.add(singletonList(SortedRun.fromSingle(file)));} else {// Large file appear, rewrite previous and upgrade itrewrite(candidate, result);upgrade(file, result);}}}}rewrite(candidate, result);result.setDeletionFile(compactDfSupplier.get());return result;}
// ... existing code ...

总结

  • 压缩对象:一个 CompactUnit 实例,它内部封装了一组待压缩的文件列表 (List<DataFileMeta>) 和目标层级。
  • 与 SortedRun 的关系SortedRun 是 LSM-Tree 的逻辑层,是压缩策略(CompactStrategy)制定计划的输入和依据。策略根据 SortedRun 的情况来决定挑选哪些文件组成 CompactUnit
  • 定位方式:通过 CompactStrategy.pick() 方法,根据预设的压缩策略(如 Universal, Full 等)分析所有 SortedRun,挑选出最需要被压缩的文件,打包成 CompactUnit,然后交由 MergeTreeCompactTask 执行。

level层次数量的权衡

这是一个关于性能权衡(Trade-off)的经典问题,主要涉及 写放大(Write Amplification) 和 读放大(Read Amplification)

降低写放大(The Main Benefit)是分更多层级的最主要原因。

  • 场景A:只有两层(L0 和 L-max) 假设我们只有 L0 和 L1(作为最高层)。当 L0 的文件需要合并时,它们需要和 L1 中所有的数据进行合并,然后生成全新的 L1。如果 L1 非常大(比如几百GB),那么即使 L0 只有很小的一点新数据(比如几百MB),也需要重写整个 L1。这个过程消耗的 I/O 非常巨大,这就是“写放大”——为了写入少量逻辑数据,却需要进行大量的物理磁盘写入。

  • 场景B:有多层(L0, L1, L2, ...) 在这种设计下,合并是逐步进行的。L0 的文件合并成 L1 的文件;当 L1 积累到一定程度,再和 L2 合并,以此类推。每次合并操作所涉及的数据量都相对较小。例如,将几个100MB的 L0 文件合并成一个 L1 文件,远比将它们与一个100GB的 L-max 文件合并要快得多。这大大降低了单次合并的成本,使得写入性能更平滑、更可预测。

平衡读写性能

  • 更多层级

    • 优点:写放大低,写入平稳。
    • 缺点:读放大高。因为一条数据可能存在于任何一层,查询时需要从 L0 到 L-max 逐层查找,直到找到最新的版本。层级越多,需要检查的地方就越多。
  • 更少层级

    • 优点:读放大低。查询时只需要检查很少的几个层级。
    • 缺点:写放大高。合并成本巨大,可能导致写入性能的剧烈抖动。

Paimon 采用的 Universal Compaction 策略,以及其多层级的结构,正是在这两种放大效应之间做权衡,目标是提供一个整体表现良好的方案,尤其是在高频写入的场景下,通过平滑合并操作来保证写入的稳定性。

总而言之,分更多层次的核心好处是显著降低写放大,以平滑的、低成本的增量合并,替代高成本的全量合并,从而获得更稳定、高效的写入性能。这是以牺牲一部分读取性能(增加读放大)为代价的,但对于许多数据仓库和数据湖场景来说,这是一个非常值得的权衡。

AbstractCompactRewriter

AbstractCompactRewriter 是 Apache Paimon 项目中用于合并树(Merge-Tree)压缩(Compaction)操作的一个核心抽象基类。它定义了压缩重写器(Compact Rewriter)的通用行为和辅助功能,具体的重写逻辑则由其子类实现。

该类位于 org.apache.paimon.mergetree.compact 包下,从其命名和 abstract 关键字可以看出,它是一个抽象类,不能被直接实例化。它实现了 CompactRewriter 接口。

/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;/** Common implementation of {@link CompactRewriter}. */
public abstract class AbstractCompactRewriter implements CompactRewriter {@Overridepublic CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {return new CompactResult(file, file.upgrade(outputLevel));}protected static List<DataFileMeta> extractFilesFromSections(List<List<SortedRun>> sections) {return sections.stream().flatMap(Collection::stream).map(SortedRun::files).flatMap(Collection::stream).collect(Collectors.toList());}@Overridepublic void close() throws IOException {}
}

它实现了 CompactRewriter 接口,我们先来看一下这个接口的定义:

// ... existing code ...
import java.io.Closeable;
import java.util.List;public interface CompactRewriter extends Closeable {/*** Rewrite sections to new level.* ...*/CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception;/*** Upgrade file to new level, usually file data is not rewritten, only the metadata is updated.* ...*/CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
}

CompactRewriter 接口定义了两个核心方法 rewrite 和 upgrade,并且继承了 Closeable 接口,意味着实现类可能持有需要关闭的资源。

AbstractCompactRewriter 提供了部分方法的默认实现,并留下一个抽象方法给子类。

upgrade(int outputLevel, DataFileMeta file)

这个方法提供了 upgrade 操作的默认实现。在 Paimon 的合并树中,upgrade 通常指将一个文件从较低的 level "提升" 到较高的 level,这个过程大多数情况下不需要重写文件内容,只需要更新文件的元数据(比如 level 信息)。 这里的实现正是如此:

  1. 它接收一个 DataFileMeta 对象(代表一个数据文件)和一个目标 outputLevel
  2. 它调用 file.upgrade(outputLevel) 创建一个新的 DataFileMeta 对象,新对象包含了更新后的 level 信息。
  3. 最后,它将原始文件和升级后的文件包装成一个 CompactResult 对象返回。CompactResult 用来封装压缩的结果,包含压缩前的文件列表和压缩后的文件列表。

extractFilesFromSections(List<List<SortedRun>> sections)

这是一个 protected static 的辅助方法,不属于接口方法。它的作用是从一个嵌套的 List<List<SortedRun>> 结构中提取出所有底层的 DataFileMeta 文件。

  • sections 代表了待压缩的多个数据段。
  • 每个 section 是一个 List<SortedRun>
  • 每个 SortedRun 包含一个或多个数据文件 (DataFileMeta)。 这个方法通过 Java Stream API 将这个三层嵌套的结构展平,最终返回一个包含所有待压缩文件的 List<DataFileMeta>。这在 rewrite 的实现中非常有用,可以方便地获取所有输入文件。

close()

该方法提供了 Closeable 接口的默认空实现。这意味着如果子类没有需要释放的资源(如文件句柄、网络连接等),就无需重写此方法。如果子类持有了需要关闭的资源,则必须重写此方法以确保资源被正确释放。

rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)

AbstractCompactRewriter 没有实现 CompactRewriter 接口中的 rewrite 方法。这正是它被声明为 abstract 的原因。 rewrite 方法是压缩操作的核心,它定义了如何读取多个输入文件(sections),通过合并、去重等逻辑,然后写入到一个或多个新的输出文件(在 outputLevel)。这个过程是复杂且多变的,取决于具体的合并策略(例如,是否保留 changelog、使用哪种合并引擎等)。 因此,AbstractCompactRewriter 将这个最核心、最易变的逻辑交由具体的子类去实现。

实现与应用

在 Paimon 项目中,有多个 AbstractCompactRewriter 的子类,它们根据不同的场景提供了具体的 rewrite 实现。例如:

  • MergeTreeCompactRewriter: 提供了通用的合并树压缩逻辑。
  • ChangelogMergeTreeRewriter: 在 MergeTreeCompactRewriter 基础上,增加了生成 changelog 的能力。
  • FullChangelogMergeTreeCompactRewriter: 一种生成完整 changelog 的策略。
  • LookupMergeTreeCompactRewriter: 适用于 lookup 模式的合并策略。

这些子类会重写 rewrite 方法,并在内部使用 extractFilesFromSections 辅助方法来获取输入文件列表。

例如,在一个测试类 MergeTreeTestBase 中,就有一个内部类 TestRewriter 继承了 AbstractCompactRewriter 并实现了 rewrite 方法,清晰地展示了其用法:

MergeTreeTestBase.java

Apply

// ... existing code ...private class TestRewriter extends AbstractCompactRewriter {@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception {// 1. 创建用于写入新文件的 writerRollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);// 2. 创建用于读取和合并旧文件的 readerRecordReader<KeyValue> reader =MergeTreeReaders.readerForMergeTree(sections,// ... reader and merge function details ...);if (dropDelete) {reader = new DropDeleteReader(reader);}// 3. 执行读写操作writer.write(new RecordReaderIterator<>(reader));writer.close();// 4. 使用父类的辅助方法获取输入文件,并返回结果return new CompactResult(extractFilesFromSections(sections), writer.result());}}
// ... existing code ...

AbstractCompactRewriter 是一个典型的模板方法模式的应用。它定义了压缩重写算法的骨架:

  1. 提供了不变的部分upgrade 方法的通用实现和 close 的空实现。
  2. 提供了可复用的辅助功能extractFilesFromSections 静态方法。
  3. 定义了易变的部分为抽象方法:将核心的 rewrite 逻辑延迟到子类中实现。

这种设计使得 Paimon 可以灵活地扩展和实现不同场景下的文件压缩策略,同时保证了代码的复用性和结构的清晰性。

MergeTreeCompactRewriter

MergeTreeCompactRewriter 是 AbstractCompactRewriter 的一个核心具体实现。在 Paimon 的合并树(Merge-Tree)结构中,当需要对数据文件进行合并(Compaction)时,这个类提供了默认的、最基础的重写(rewrite)逻辑。

// ... existing code ...
import java.util.Comparator;
import java.util.List;/** Default {@link CompactRewriter} for merge trees. */
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
// ... existing code ...

从定义可以看出:

  • 它是一个公开类,位于 org.apache.paimon.mergetree.compact 包下。
  • 它继承自我们之前分析过的 AbstractCompactRewriter。这意味着它自动获得了 upgrade 方法的默认实现(仅更新元数据)和 extractFilesFromSections 辅助方法。
  • 它的核心职责是实现 AbstractCompactRewriter 中定义的抽象方法 rewrite

核心成员变量与构造函数

MergeTreeCompactRewriter 的行为由其成员变量(在构造时注入)决定,这体现了依赖注入的设计思想,使得类更加灵活和可测试。

// ... existing code ...
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {protected final FileReaderFactory<KeyValue> readerFactory;protected final KeyValueFileWriterFactory writerFactory;protected final Comparator<InternalRow> keyComparator;@Nullable protected final FieldsComparator userDefinedSeqComparator;protected final MergeFunctionFactory<KeyValue> mfFactory;protected final MergeSorter mergeSorter;public MergeTreeCompactRewriter(FileReaderFactory<KeyValue> readerFactory,KeyValueFileWriterFactory writerFactory,Comparator<InternalRow> keyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionFactory<KeyValue> mfFactory,MergeSorter mergeSorter) {this.readerFactory = readerFactory;this.writerFactory = writerFactory;this.keyComparator = keyComparator;this.userDefinedSeqComparator = userDefinedSeqComparator;this.mfFactory = mfFactory;this.mergeSorter = mergeSorter;}
// ... existing code ...
  • readerFactory: 文件读取器工厂,用于创建读取输入数据文件(SortedRun中的文件)的 RecordReader
  • writerFactory: 文件写入器工厂,用于创建 RollingFileWriter,将合并后的数据写入新的输出文件。
  • keyComparator: 键比较器,用于在合并过程中对 KeyValue 的键(key)进行排序和比较。
  • userDefinedSeqComparator: 用户定义的序列号比较器(可选)。Paimon 支持用户自定义字段作为合并时的排序依据,这个比较器就用于此目的。
  • mfFactory: 合并函数工厂(MergeFunctionFactory),用于创建 MergeFunctionMergeFunction 定义了当遇到相同键(key)的多条记录时,应如何合并它们。例如,可以是去重(Deduplicate)、部分更新(Partial Update)或聚合(Aggregation)等。
  • mergeSorter: 合并排序器,当待合并的数据量过大,无法完全在内存中进行时,MergeSorter 会利用外部排序(External Sort)来处理。

rewrite 方法

这是 CompactRewriter 接口的核心方法。MergeTreeCompactRewriter 的实现非常直接,它将调用转发给了 rewriteCompaction 方法。这种设计模式允许子类在不改变公共API的情况下,更容易地重写或扩展核心压缩逻辑。

// ... existing code ...@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {return rewriteCompaction(outputLevel, dropDelete, sections);}
// ... existing code ...

rewriteCompaction 方法

这是实际执行压缩重写逻辑的地方,是整个类的核心。

// ... existing code ...protected CompactResult rewriteCompaction(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {// 1. 创建一个滚动写入器,用于写入压缩后的新文件RollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);RecordReader<KeyValue> reader = null;Exception collectedExceptions = null;try {// 2. 创建一个合并读取器,它会读取所有输入文件,并应用合并逻辑reader =readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));// 3. 如果需要,包装一个 DropDeleteReader 来过滤掉删除标记的记录if (dropDelete) {reader = new DropDeleteReader(reader);}// 4. 执行读写:从 reader 读取合并后的数据,写入 writerwriter.write(new RecordReaderIterator<>(reader));} catch (Exception e) {collectedExceptions = e;} finally {// 5. 确保资源被关闭try {IOUtils.closeAll(reader, writer);} catch (Exception e) {collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);}}if (null != collectedExceptions) {// 6. 如果发生异常,终止写入并抛出异常writer.abort();throw collectedExceptions;}// 7. 构造并返回 CompactResultList<DataFileMeta> before = extractFilesFromSections(sections);notifyRewriteCompactBefore(before);return new CompactResult(before, writer.result());}
// ... existing code ...

其执行流程可以分解为:

  1. 创建写入器:通过 writerFactory 创建一个 RollingFileWriter,准备将合并后的数据写入到指定 outputLevel 的新文件中。
  2. 创建读取器:调用 readerForMergeTree 方法创建一个 RecordReader。这个 reader 是一个复合的 reader,它能够同时读取多个 SortedRun 中的所有文件,并使用 MergeFunction 在内存中或通过外部排序进行多路归并。
  3. 处理删除标记:如果 dropDelete 参数为 true(通常在对最高 level 进行 full compaction 时),则用 DropDeleteReader 包装原始 reader。这个装饰器会过滤掉类型为 DELETE 的记录。
  4. 执行重写writer.write(new RecordReaderIterator<>(reader)) 是核心步骤。它驱动 reader 读取、合并数据,并将结果逐条写入 writer
  5. 资源管理与异常处理:使用 try-finally 确保 reader 和 writer 无论成功还是失败都能被关闭。如果过程中发生异常,会收集起来。
  6. 失败回滚:如果收集到异常,调用 writer.abort() 来删除可能已产生的临时文件,然后将异常抛出。
  7. 返回结果:如果成功,它会调用父类的 extractFilesFromSections 获取所有输入文件,并结合 writer.result()(所有新生成的输出文件),封装成一个 CompactResult 对象返回。在返回前,会调用一个空的 notifyRewriteCompactBefore 方法,这是一个为子类提供的扩展点。

readerForMergeTree 和 notifyRewriteCompactBefore

  • readerForMergeTree: 这是一个辅助方法,它封装了创建合并树读取器的复杂性,将所有需要的组件(readerFactorykeyComparator 等)传递给 MergeTreeReaders.readerForMergeTree 来构造最终的 RecordReader
  • notifyRewriteCompactBefore: 这是一个空的 protected 方法,充当一个钩子(Hook)。子类可以重写此方法来在压缩完成、返回结果之前执行一些额外的逻辑。例如,LookupMergeTreeCompactRewriter 就重写了此方法来清理与被压缩文件相关的删除向量(Deletion Vectors)。

在系统中的位置和扩展

  • MergeTreeCompactRewriter 是 Paimon 压缩机制的基石。MergeTreeCompactManager 负责制定压缩计划,当它决定执行一个压缩任务(MergeTreeCompactTask)时,就会使用一个 CompactRewriter 的实例来执行实际的文件读写和合并。
  • 这个类是可扩展的。Paimon 中更复杂的 Rewriter,如 ChangelogMergeTreeRewriter(用于生成 changelog)、LookupMergeTreeCompactRewriter(用于 lookup 表)和 FullChangelogMergeTreeCompactRewriter,都直接或间接地继承自 MergeTreeCompactRewriter,并在其基础上增加或修改功能。

总结

MergeTreeCompactRewriter 是 Paimon 合并树 compaction 逻辑的一个标准、通用的实现。它清晰地展示了如何将多个已排序的文件(SortedRun)通过多路归并、应用合并函数,最终重写为新的、更紧凑的文件。其设计利用了依赖注入和模板方法/钩子模式,具有良好的灵活性和扩展性,为更高级的压缩策略提供了坚实的基础。

为了更清晰地理解,梳理从顶层到底层的核心调用链:

  1. MergeTreeCompactRewriter.readerForMergeTree
  2. MergeTreeReaders.readerForMergeTree
    • 调用 -> ConcatRecordReader.create 将多个 section 的 reader 串联起来。
    • 为每个 section 调用 -> MergeTreeReaders.readerForSection
  3. MergeTreeReaders.readerForSection
    • 调用 -> MergeSorter.mergeSort 对一个 section 内的所有 SortedRun 进行合并排序。
    • 为每个 SortedRun 调用 -> MergeTreeReaders.readerForRun
  4. MergeTreeReaders.readerForRun
    • 调用 -> ConcatRecordReader.create 将一个 SortedRun 内的所有文件串联起来。
  5. MergeSorter.mergeSort
    • 调用 -> SortMergeReader.createSortMergeReader (当数据无需溢出到磁盘时)
  6. SortMergeReader.createSortMergeReader
    • 创建 -> SortMergeReaderWithMinHeap 或 SortMergeReaderWithLoserTree (最终执行多路归并的 Reader)

SortMergeReaderWithMinHeap 和 SortMergeReaderWithLoserTree 是两种经典的多路归并排序算法的实现。它们接收多个已经排好序的 RecordReader 作为输入,通过内部的数据结构(最小堆或败者树)高效地从所有输入中找出当前最小的记录。当遇到主键相同的记录时,它们会调用 MergeFunctionWrapper 来执行用户定义的合并逻辑(如去重、更新等)。

    IntervalPartition

    IntervalPartition 是 Paimon 在执行 Merge-Tree 压缩(Compaction)时一个非常核心的工具类。它的主要目标是将一组给定的数据文件(DataFileMeta)进行高效地分组,以便后续进行归并排序。这个分组算法非常关键,因为它直接影响了压缩任务的并行度和效率。

    从类的注释中我们可以看到,它的目的是:

    Algorithm to partition several data files into the minimum number of {@link SortedRun}s.

    即:将多个数据文件划分为最少数目的 SortedRun 的算法

    这里的 SortedRun 代表一组按主键有序且键范围互不重叠的文件集合。将文件划分为最少的 SortedRun 意味着我们可以用最少的归并路数来完成排序,从而提高效率。

    为了实现这个目标,IntervalPartition 采用了两层划分的策略:

    • 第一层:Section(分段)

      • 它首先将所有输入文件按照主键范围(Key Interval)划分为若干个 Section
      • 不同 Section 之间的主键范围是完全不重叠的
      • 这样做的好处是,不同 Section 之间的数据没有交集,因此可以独立、并行地进行处理,而不会相互影响。这极大地提高了压缩的并行能力。
    • 第二层:SortedRun(有序运行)

      • 在每个 Section 内部,文件的主键范围是可能相互重叠的。
      • 算法的目标是在这个 Section 内,将这些可能重叠的文件,组合成最少数目的 SortedRun
      • 每个 SortedRun 内部的文件,按主键有序排列,并且它们的主键范围不会重叠。

    最终,IntervalPartition 的输出是一个二维列表 List<List<SortedRun>>,外层列表代表 Section,内层列表代表该 Section 内划分出的所有 SortedRun

    构造函数与初始化

    // ... existing code ...public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow> keyComparator) {this.files = new ArrayList<>(inputFiles);this.files.sort((o1, o2) -> {int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());return leftResult == 0? keyComparator.compare(o1.maxKey(), o2.maxKey()): leftResult;});this.keyComparator = keyComparator;}
    // ... existing code ...
    

    构造函数做了非常重要的一步预处理: 对所有输入的 DataFileMeta 文件,首先按照它们的 minKey(最小主键)进行升序排序。如果 minKey 相同,则再按照 maxKey(最大主键)进行升序排序。

    这个排序是后续所有划分算法的基础,保证了文件是按照主键范围的起始位置被依次处理的。

    partition() 方法:第一层划分(切分 Section)

    这是该类的入口方法,负责将已排序的文件切分成多个 Section。

    // ... existing code ...public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;}
    // ... existing code ...
    

    其逻辑如下:

    1. 维护一个当前 section 的文件列表和一个当前 section 的右边界 bound(即 section 中所有文件的 maxKey 的最大值)。
    2. 遍历所有(已按 minKey 排序的)文件:
      • 将当前文件 meta 加入 section
      • 更新 bound 为当前 section 中所有文件 maxKey 的最大值。
      • 核心判断:在添加下一个文件之前,检查它的 minKey 是否大于当前 section 的右边界 bound
      • 如果 meta.minKey() > bound,说明这个新文件与当前 section 内的所有文件都没有主键范围的重叠。这意味着一个 Section 到此结束了。
      • 此时,就将当前 section 里的文件列表交给 partition(section) 方法(第二层划分)处理,得到 List<SortedRun>,然后加入最终结果 result
      • 清空 section 和 bound,开始一个新的 Section。
    3. 循环结束后,处理最后一个 section

    通过这种方式,它有效地将所有文件切分成了若干个主键范围互不重叠的 Section。

    partition(List<DataFileMeta> metas) 方法:第二层划分(贪心算法生成 SortedRun)

    这个私有方法是算法的精髓所在。它接收一个 Section 内的文件列表(这些文件的主键范围可能重叠),目标是将它们划分为最少的 SortedRun

    这里使用了一个经典的贪心算法,并借助优先队列(最小堆) 来实现。

    // ... existing code ...private List<SortedRun> partition(List<DataFileMeta> metas) {PriorityQueue<List<DataFileMeta>> queue =new PriorityQueue<>((o1, o2) ->// sort by max key of the last data filekeyComparator.compare(o1.get(o1.size() - 1).maxKey(),o2.get(o2.size() - 1).maxKey()));// create the initial partitionList<DataFileMeta> firstRun = new ArrayList<>();firstRun.add(metas.get(0));queue.add(firstRun);for (int i = 1; i < metas.size(); i++) {DataFileMeta meta = metas.get(i);// any file list whose max key < meta.minKey() is sufficient,// for convenience we pick the smallestList<DataFileMeta> top = queue.poll();if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {// append current file to an existing partitiontop.add(meta);} else {// create a new partitionList<DataFileMeta> newRun = new ArrayList<>();newRun.add(meta);queue.add(newRun);}queue.add(top);}// order between partitions does not matterreturn queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());}
    }
    

    逻辑分解如下:

    1. 创建一个优先队列 queue。队列中的元素是 List<DataFileMeta>,代表一个正在构建中的 SortedRun
    2. 这个优先队列的排序规则是:按照每个 SortedRun 中最后一个文件的 maxKey 进行升序排序。这意味着,队首的 SortedRun 是当前所有 SortedRun 中,右边界最小的那个。
    3. 将 Section 中的第一个文件放入一个新的 SortedRun,并加入队列。
    4. 遍历 Section 中剩余的文件 meta
      • 从优先队列中取出队首元素 top(即右边界最小的那个 SortedRun)。
      • 核心判断:比较当前文件 meta 的 minKey 和 top 的 maxKey
      • 如果 meta.minKey() 大于 top.get(top.size() - 1).maxKey(),说明 meta 文件可以安全地追加到 top 这个 SortedRun 的末尾,而不会破坏其内部的有序性(因为 meta 的范围在 top 之后)。于是,将 meta 添加到 top 中。
      • 如果 meta.minKey() 不大于 top 的 maxKey,说明 meta 与 top 这个 SortedRun 的范围有重叠,不能追加。此时,必须为 meta 创建一个新的 SortedRun,并将这个新的 SortedRun 也加入到优先队列中。
      • 最后,将被修改过(或者未修改)的 top 重新放回优先队列。
    5. 遍历结束后,优先队列 queue 中剩下的所有 List<DataFileMeta> 就是划分好的、数量最少的 SortedRun 集合。

    这个贪心策略的正确性在于,对于每一个新来的文件,我们总是尝试将它追加到最早可以结束的那个 SortedRun 后面。这样可以最大限度地复用现有的 SortedRun,从而保证最终 SortedRun 的数量最少。这在算法上被称为“区间划分问题”的一种经典解法。

    总结

    IntervalPartition 通过一个两阶段的划分过程,巧妙地解决了如何高效组织待压缩文件的问题:

    1. 宏观上,通过 partition() 方法按主键范围切分出互不重叠的 Section,为并行处理创造了条件。
    2. 微观上,在每个 Section 内部,通过 partition(List<DataFileMeta> metas) 方法中的贪心算法,将可能重叠的文件高效地组织成最少数目的 SortedRun,为后续的归并排序(Merge-Sort)提供了最优的输入,减少了归并的复杂度。

    这个类的设计体现了在数据密集型系统中,通过精巧的算法设计来优化核心流程(如Compaction)的典型思路。

    MergeTreeCompactManager

    MergeTreeCompactManager 是 Paimon 中 merge-tree 核心写流程的心脏,它负责管理和调度数据文件的合并(Compaction)任务。下面我将从几个关键部分来解析这个类。

    MergeTreeCompactManager 继承自 CompactFutureManager,它的核心职责是:

    • 管理数据文件:通过内部的 Levels 对象,维护所有数据文件(DataFileMeta)在不同层级(Level)的分布情况。
    • 触发合并任务:根据预设的合并策略(CompactStrategy),决定何时以及哪些文件需要进行合并。
    • 提交和管理合并任务:将选出的文件打包成一个合并单元(CompactUnit),并提交给一个异步执行的 MergeTreeCompactTask 任务。
    • 处理合并结果:获取异步任务的执行结果(CompactResult),并用它来更新 Levels 中文件的状态。
    • 控制写入流速:通过判断当前文件堆积情况,决定是否需要阻塞上游的写入(Write)操作,防止因合并速度跟不上写入速度而导致系统不稳定。

    核心成员变量 (Key Fields)

    我们来看一下这个类中最重要的几个成员变量,它们定义了 MergeTreeCompactManager 的行为:

    // ... existing code ...
    public class MergeTreeCompactManager extends CompactFutureManager {private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class);private final ExecutorService executor;private final Levels levels;private final CompactStrategy strategy;private final Comparator<InternalRow> keyComparator;private final long compactionFileSize;private final int numSortedRunStopTrigger;private final CompactRewriter rewriter;@Nullable private final CompactionMetrics.Reporter metricsReporter;@Nullable private final DeletionVectorsMaintainer dvMaintainer;private final boolean lazyGenDeletionFile;private final boolean needLookup;@Nullable private final RecordLevelExpire recordLevelExpire;
    // ... existing code ...
    
    • executor: 一个 ExecutorService 线程池,用于异步执行合并任务。
    • levelsLevels 对象,是 LSM-Tree(Log-Structured Merge-Tree)分层结构的核心体现。它管理着所有的数据文件,并根据 Level 组织它们。
    • strategyCompactStrategy,合并策略。它定义了如何从 Levels 中挑选文件进行合并。Paimon 提供了如 UniversalCompaction 和 LevelCompaction 等策略。
    • keyComparator: 主键的比较器,用于在合并过程中对数据进行排序。
    • compactionFileSize: 合并后生成的目标文件大小。
    • numSortedRunStopTrigger: 一个非常重要的阈值。当 Levels 中的有序文件片段(Sorted Run)数量超过这个值时,会阻塞写入操作,等待合并完成。这是控制写入和合并速度平衡的关键。
    • rewriterCompactRewriter,合并重写器。它负责读取待合并的旧文件,使用 MergeFunction 对数据进行合并,然后写入新文件。这是实际执行数据合并逻辑的组件。
    • needLookup: 一个布尔值,通常与 changelog-producer = lookup 配置相关。当为 true 时,表示在合并时需要通过 lookup 方式生成 changelog。

    triggerCompaction(boolean fullCompaction): 触发合并

    这是发起合并的入口。

    // ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 处理强制全量合并 ...optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {if (taskFuture != null) {return;}// ...optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}optionalUnit.ifPresent(unit -> {// ...boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ...submitCompaction(unit, dropDelete);});}
    // ... existing code ...
    
    • fullCompaction 参数: 如果为 true,会触发一次全量合并,通常是将所有文件合并成一个或少数几个大文件。这是一种比较重的操作,一般由用户手动触发。
    • 普通合并: 如果 fullCompaction 为 false,则会调用 strategy.pick(...) 方法,让合并策略根据当前的 Levels 状态来决定是否需要合并以及合并哪些文件。
    • 提交任务: 如果策略选出了需要合并的文件(CompactUnit),就会调用 submitCompaction 方法将任务提交到线程池。

    submitCompaction(CompactUnit unit, boolean dropDelete): 提交合并任务

    这个方法负责创建并提交一个真正的合并任务。

    // ... existing code ...private void submitCompaction(CompactUnit unit, boolean dropDelete) {// ...MergeTreeCompactTask task =new MergeTreeCompactTask(keyComparator,compactionFileSize,rewriter,unit,dropDelete,levels.maxLevel(),metricsReporter,compactDfSupplier,recordLevelExpire);// ...taskFuture = executor.submit(task);// ...}
    // ... existing code ...
    

    它将所有需要的组件(如 rewriterkeyComparator 等)和待合并的文件(unit)打包成一个 MergeTreeCompactTask 对象,然后通过 executor.submit(task) 提交给线程池异步执行。

    getCompactionResult(boolean blocking): 获取合并结果

    当外部(通常是 MergeTreeWriter)需要获取合并结果时,会调用此方法。

    // ... existing code .../** Finish current task, and update result files to {@link Levels}. */@Overridepublic Optional<CompactResult> getCompactionResult(boolean blocking)throws ExecutionException, InterruptedException {Optional<CompactResult> result = innerGetCompactionResult(blocking);result.ifPresent(r -> {// ...levels.update(r.before(), r.after());MetricUtils.safeCall(this::reportMetrics, LOG);// ...});return result;}
    // ... existing code ...
    
    • 它会从 taskFuture 中获取任务结果 CompactResultCompactResult 中包含了合并前的文件列表(before)和合并后生成的新文件列表(after)。
    • 最关键的一步是 levels.update(r.before(), r.after()),它用新生成的文件替换掉了旧的、已被合并的文件,从而更新了整个 Levels 的状态。

    shouldWaitForLatestCompaction() 和 compactNotCompleted(): 控制写入流速

    这两个方法是 Paimon 实现反压(Back-Pressure)机制的关键。

    // ... existing code ...@Overridepublic boolean shouldWaitForLatestCompaction() {return levels.numberOfSortedRuns() > numSortedRunStopTrigger;}// ... existing code ...@Overridepublic boolean compactNotCompleted() {// If it is a lookup compaction, we should ensure that all level 0 files are consumed, so// here we need to make the outside think that we still need to do unfinished compact// workingreturn super.compactNotCompleted() || (needLookup && !levels().level0().isEmpty());}
    // ... existing code ...
    
    • shouldWaitForLatestCompaction(): 当 Levels 中 SortedRun 的数量超过了 numSortedRunStopTrigger 阈值时,返回 trueMergeTreeWriter 会根据这个返回值决定是否要阻塞写入,等待合并完成,以释放一些 SortedRun
    • compactNotCompleted(): 判断是否还有未完成的合并工作。特别地,如果配置了 needLookup,即使没有正在运行的合并任务,只要 Level 0 还存在文件,它也会返回 true,以确保 Level 0 的文件能被尽快合并掉,这对于 lookup 生成 changelog 的正确性至关重要。

    总结

    MergeTreeCompactManager 是一个典型的生产者-消费者模型实现。MergeTreeWriter 是生产者,不断产生新的 L0 层文件;MergeTreeCompactManager 内部的 MergeTreeCompactTask 是消费者,不断地将多层级的文件合并成更少、更大的文件。

    它通过 Levels 来感知系统状态,通过 CompactStrategy 来制定决策,通过 CompactRewriter 和 MergeTreeCompactTask 来执行任务,并通过 numSortedRunStopTrigger 等机制来协调与上游生产者的速度,最终保证了整个 merge-tree 写入模型的稳定和高效。

      Paimon 进行 Compact 是不是就是保留最新的 Key?

      Paimon 的 Compact 行为取决于在表上配置的 merge-engine(合并引擎)。保留最新的 Key 只是其中一种最常见的默认行为。

      Paimon 支持多种合并引擎,来满足不同的业务场景:

      • deduplicate (默认值): 这是你提到的“保留最新的 Key”的行为。当多条记录有相同的主键时,在合并(Compaction)过程中,Paimon 只会保留序列号(sequence number)最大的一条记录,即最新的记录。
      • partial-update: 部分更新引擎。它允许你只更新一行的某些字段。合并时,它会将同一主键的多条记录的字段进行合并,而不是简单地保留最新的一整行。对于未更新的字段,会保留其旧值。
      • aggregation: 聚合引擎。它可以在数据写入过程中,根据预设的聚合函数(如 SUMMAXMINCOUNT 等)对同一主键的记录进行实时聚合。
      • first-row: 保留第一行的引擎。当有相同主键的多条数据时,它会保留遇到的第一条数据,后续的数据会被忽略。这在需要按主键去重的流式数据入湖场景中很有用。

      这些不同的合并逻辑由不同的 MergeFunction 实现类来处理,例如 DeduplicateMergeFunctionPartialUpdateMergeFunction 和 AggregateMergeFunction

      // ... existing code ...
      public class AggregateMergeFunction implements MergeFunction<KeyValue> {private final InternalRow.FieldGetter[] getters;private final FieldAggregator[] aggregators;
      // ... existing code ...@Overridepublic void add(KeyValue kv) {
      // ... existing code ...for (int i = 0; i < getters.length; i++) {aggregators[i].agg(row, getters[i].getFieldOrNull(value));}}@Overridepublic KeyValue getResult() {
      // ... existing code ...}
      // ... existing code ...
      }
      

      从上面的 AggregateMergeFunction 可以看到,它的 add 方法是在对字段进行聚合(agg),而不是简单地替换。

      MergeFunction 定义了具体的数据 合并逻辑(比如是去重、聚合还是部分更新),而 MergeFunctionWrapper 则是在 MergeFunction 的基础上增加了一层 “包装”,用于处理更复杂的场景,主要是为了 优化性能 和 生成 Changelog

      不同的 MergeFunctionWrapper 实现是为了支持 Paimon 不同的 Changelog 生成策略 (changelog-producer) 和合并场景:

      1. ReducerMergeFunctionWrapper 是一个基础的、带有优化的包装器。从它的文档和实现可以看出,当一个 Key 只有一条记录需要合并时,它会直接返回这条记录,避免了调用内部 MergeFunction 的开销。它主要用于不需要生成 Changelog 或采用简单合并策略的场景,只返回合并后的最终结果 KeyValue

        /** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.KeyValue;/*** Wrapper for {@link MergeFunction}s which works like a reducer.** <p>A reducer is a type of function. If there is only one input the result is equal to that input;* Otherwise the result is calculated by merging all the inputs in some way.** <p>This wrapper optimize the wrapped {@link MergeFunction}. If there is only one input, the input* will be stored and the inner merge function will not be called, thus saving some computing time.*/
        public class ReducerMergeFunctionWrapper implements MergeFunctionWrapper<KeyValue> {private final MergeFunction<KeyValue> mergeFunction;private KeyValue initialKv;private boolean isInitialized;public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {this.mergeFunction = mergeFunction;}/** Resets the {@link MergeFunction} helper to its default state. */@Overridepublic void reset() {initialKv = null;mergeFunction.reset();isInitialized = false;}/** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */@Overridepublic void add(KeyValue kv) {if (initialKv == null) {initialKv = kv;} else {if (!isInitialized) {merge(initialKv);isInitialized = true;}merge(kv);}}private void merge(KeyValue kv) {mergeFunction.add(kv);}/** Get current value of the {@link MergeFunction} helper. */@Overridepublic KeyValue getResult() {return isInitialized ? mergeFunction.getResult() : initialKv;}
        }
        
      2. FullChangelogMergeFunctionWrapper: 用于 changelog-producer = full-compaction 模式。在这种模式下,Paimon 会在最高层(maxLevel)的文件中查找旧值,并与当前合并的结果进行比较,从而生成 INSERTUPDATE_BEFOREUPDATE_AFTERDELETE 这样的完整 Changelog。这个 Wrapper 的职责就是协调这个过程。

      3. LookupChangelogMergeFunctionWrapper: 用于 changelog-producer = lookup 模式。它会在合并前通过 lookup 的方式去查找旧的记录,从而生成 Changelog。相比 full-compaction,它可能更高效,因为它不需要总是访问最高层的文件。

      4. FirstRowMergeFunctionWrapper: 这是一个专为 first-row 合并引擎设计的包装器,用于处理其独特的 Changelog 生成逻辑。

      总结一下:

      • MergeFunction 决定了数据 如何合并(What)。
      • MergeFunctionWrapper 决定了合并过程 如何被执行和协调(How),特别是如何与不同的 Changelog 生成策略相结合,并进行性能优化。

      本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
      如若转载,请注明出处:http://www.pswp.cn/bicheng/88317.shtml
      繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88317.shtml
      英文地址,请注明出处:http://en.pswp.cn/bicheng/88317.shtml

      如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

      相关文章

      小米路由器3C刷OpenWrt,更换系统/变砖恢复 指南

      基础篇看这里&#xff1a; 小米路由器3C如何安装OpenWrt官方编译的ROM - 哔哩哔哩 小米路由器 3C 刷入 Breed 和 OpenWrt - Snoopy1866 - 博客园 一、路由器注入 如果按照上面的文章&#xff0c; telnet、ftp一直连接失败,那么可以尝试看 这里&#xff1a; 获取路由器root权…

      Spring Boot 项目启动时按需初始化加载数据

      1、新建类&#xff0c;类上添加注解 Component &#xff0c;该类用于在项目启动时处理数据加载任务&#xff1b; 2、该类实现 ApplicationRunner 接口&#xff0c;并重写 run 方法&#xff1b; 3、在重写的 run 方法里处理数据加载任务&#xff1b; 注意&#xff1a; 有定时加载…

      MCP快速入门—快速构建自己的服务器

      引言 随着大语言模型(LLM)技术的快速发展&#xff0c;如何扩展其能力边界成为开发者关注的重点。MCP(Model Capability Protocol)作为一种协议标准&#xff0c;允许开发者构建自定义服务器来增强LLM的功能。 正文内容 1. MCP核心概念与技术背景 MCP服务器主要提供三种能力类…

      Vue 事件总线深度解析:从实现原理到工程实践

      在 Vue 组件通信体系中&#xff0c;事件总线&#xff08;Event Bus&#xff09;是处理非父子组件通信的轻量解决方案。本文将从技术实现细节、工程化实践、内存管理等维度展开&#xff0c;结合源码级分析与典型场景&#xff0c;带你全面掌握这一核心技术点。​一、事件总线的技…

      CMake Qt静态库中配置qrc并使用

      CMake Qt序言环境代码序言 看网上这资料较少&#xff0c;且我理解起来有歧义&#xff0c;特地补充 环境 CMake&#xff1a;3.29.2 Qt&#xff1a;5.15.2 MSVC&#xff1a;2022 IDE&#xff1a;QtCreator 代码 方式一&#xff1a; 在CMakeLists.txt里&#xff0c;add_libr…

      记录一下:成功部署k8s集群(部分)

      前提条件&#xff1a;安装了containerd、docker 关闭了firewalld、selinux 配置了时间同步服务 chronyd 关闭swap分区等1、在控制节点、工作节点&#xff0c;安装kubelet、kubeadm、kubectlyum install -y kubelet-1.26.0 kubeadm-1.26.0 kubectl-1.26.0 …

      Idea如何解决包冲突

      Idea如何解决包冲突1.Error信息&#xff1a;JAR列表。 在扫描期间跳过不需要的JAR可以缩短启动时间和JSP编译时间。SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/E:/javapojects/stww-v4-gjtwt-seal/target/stww--v4-platform-proj…

      python 协程学习笔记

      目录 python 协程 通俗理解 Python 的 asyncio 协程&#xff0c;最擅长的是&#xff1a; 批量下载文件的例子&#xff1a; 协程的优势&#xff1a; python 协程 通俗理解 def my_coroutine():print("开始")x yield 1print("拿到了&#xff1a;", x)yi…

      【学习笔记】蒙特卡洛仿真与matlab实现

      概述 20 世纪 40 年代&#xff0c;由于电子计算机的出现&#xff0c; 借助计算机可以实现大量的随机抽样试验&#xff0c;为利用随机试验方法解决实际问题提供了便捷。 非常具代表性的例子是&#xff0c; 美国在第二次世界大战期间研制原子弹的“曼哈顿计划”中&#xff0c;为了…

      HTTP/3.x协议详解:基于QUIC的下一代Web传输协议

      一、HTTP/3协议概述 HTTP/3是超文本传输协议&#xff08;HTTP&#xff09;的第三个正式版本&#xff0c;由IETF&#xff08;互联网工程任务组&#xff09;于2022年正式标准化&#xff08;RFC 9114&#xff09;。其核心创新在于完全基于QUIC协议替代传统TCP&#xff0c;结合UDP…

      【SQL】使用UPDATE修改表字段的时候,遇到1054 或者1064的问题怎么办?

      我在使用python连接sql修改表格的时间字段的时候&#xff0c;遇到这样一个问题&#xff1a;ProgrammingError: (pymysql.err.ProgrammingError) (1064, “You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the ri…

      【字节跳动】数据挖掘面试题0013:怎么做男女二分类问题, 从抖音 app 提供的内容中。

      文章大纲 🔍 一、问题定义与数据基础数据源及预处理:⚙️ 二、特征工程方案1. 文本特征2. 视觉特征3. 音频与行为特征4. 上下文特征🤖 三、模型选型与训练1. 基础模型对比2. 多模态融合模型3. 训练技巧📊 四、评估与优化策略1. 评估指标2. 典型问题优化3. 算法偏差控制�…

      HTTP请求走私漏洞

      一、漏洞定义与核心原理HTTP请求走私&#xff08;HTTP Request Smuggling&#xff09;是一种利用前端服务器&#xff08;如代理、负载均衡器&#xff09;与后端服务器在解析HTTP请求时的不一致性&#xff0c;绕过安全机制并执行恶意操作的攻击技术。其核心在于混淆请求边界&…

      Javaweb - 10.1 Servlet

      目录 Servlet 简介 动态资源和静态资源 Servlet 简介 Servlet 开发流程 目标 开发过程 开发一个 web 类型的 module 开发一个 form 表单 开发一个 UserServlet 在 web..xml 为 userServlet 配置请求路径 Edit Configurations 启动项目 完&#xff01; Servlet 简介…

      手机能用酒精擦吗?

      对于电视、电脑屏幕来说&#xff0c;为了避免反光、改善显示效果&#xff0c;会在屏幕表面覆上一层“抗反射涂层”。不同厂商设计的涂层材料并不相同&#xff0c;酒精作为良好的溶剂&#xff0c;确实会损坏可溶的涂层。手机作为触控产品&#xff0c;通常会在屏幕表面增加“疏水…

      【图像处理基石】图像超分辨率有哪些研究进展值得关注?

      近年来&#xff0c;图像超分辨率&#xff08;SR&#xff09;领域在深度学习技术的推动下取得了显著进展&#xff0c;尤其在模型架构优化、计算效率提升和真实场景适应性等方面涌现出诸多创新。以下是基于最新研究的核心进展梳理&#xff1a; 一、高效大图像处理&#xff1a;像素…

      Windows系统下WSL从C盘迁移方案

      原因&#xff1a;一开始装WSL的时候放在了C盘&#xff0c;这下好了&#xff0c;跑了几个深度学习模型训练后&#xff0c;C盘快满了&#xff0c;这可怎么办&#xff1f;可愁坏了。没关系&#xff0c;山人自有妙计。我们将WSL迁移到D盘或者E盘呀。一.迁移操作步骤前期准备&#x…

      金融时间序列机器学习训练前的数据格式验证系统设计与实现

      金融时间序列机器学习训练前的数据格式验证系统设计与实现 前言 在机器学习项目中&#xff0c;数据质量是决定模型成功的关键因素。特别是在金融时间序列分析领域&#xff0c;原始数据往往需要经过复杂的预处理才能用于模型训练。本文将详细介绍一个完整的数据格式验证系统&…

      cocos2dx3.x项目升级到xcode15以上的iconv与duplicate symbols报错问题

      cocos2dx3.x项目升级xcode15以上后会有几处报错。1. CCFontAtlas.cpp文件下的iconv与iconv_close的报错。修改如下&#xff1a;// iconv_close(_iconv);iconv_close((iconv_t)_iconv);iconv((iconv_t)_iconv, (char**)&pin, &inLen, &pout, &outLen); /…

      HTTP/3.0的连接迁移使用连接ID来标识连接为什么可以做到连接不会中断

      一定要结合图文一起理解&#xff01;&#xff01; 文章目录文字描述传统方式&#xff1a;HTTP/2 基于 TCP 的连接&#xff08;就像打固定电话&#xff09;HTTP/3 基于 QUIC 的连接迁移&#xff08;就像用带“通话ID”的手机&#xff09;总结一下图文详解HTTP2.0传统方式&#x…