压缩怎么进行的
这里的操作都是KValue,内部有row kind,标记了删除和插入
MergeTreeCompactManager
是 Paimon 中 Merge-Tree 结构压缩任务的总调度中心。它的核心职责就是监控文件的层级状态(Levels
),并在合适的时机,根据预设的策略,挑选文件并发起压缩任务。
triggerCompaction
方法正是这个调度中心的核心入口。下面我们来详细拆解它的逻辑和涉及的策略。
这个方法有两个主要的执行路径,由 fullCompaction
参数控制:
fullCompaction = true
: 强制执行一次“完全合并”。通常由用户手动触发(比如通过ALTER TABLE ... COMPACT
命令)。它会尝试将所有数据文件合并成尽可能少的文件。fullCompaction = false
: 执行一次“常规合并”。这是系统在日常写入过程中自动触发的,目的是维持 LSM-Tree 的健康结构,防止某一层级文件过多,影响读取性能。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;// 1. 获取当前所有层级的 SortedRun 视图List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 省略日志和检查 ...// 2a. 如果是完全合并,调用 FullCompaction 策略optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {// ... 省略日志和检查 ...// 2b. 如果是常规合并,调用配置的 strategy (如 UniversalCompaction)optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}// 3. 如果策略选出了需要合并的文件单元 (CompactUnit)optionalUnit.ifPresent(unit -> {// 4. 决定是否可以丢弃删除标记 (dropDelete)boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ... 省略日志 ...// 5. 提交压缩任务submitCompaction(unit, dropDelete);});}
// ... existing code ...
完全合并策略 (CompactStrategy.pickFullCompaction
)
当 fullCompaction
为 true
时,会调用 CompactStrategy.pickFullCompaction
。这个策略非常直接:
- 目标:将所有层级的所有文件合并到最高层级(
maxLevel
)。 - 行为:它会收集
levels
中的所有DataFileMeta
,将它们打包成一个巨大的CompactUnit
,并将outputLevel
设置为levels.maxLevel()
。 - 应用场景:
- 用户希望整理数据碎片,减少文件总数,优化后续的查询性能。
- 清理过期数据(如果配置了
recordLevelExpire
)。因为只有在最高层级的合并才能确保数据不会再被旧版本覆盖,从而安全地物理删除。
常规合并策略 (strategy.pick
)
当 fullCompaction
为 false
时,会调用构造函数中传入的 strategy
实例的 pick
方法。在 Paimon 中,最常用的常规策略是 UniversalCompaction
。
UniversalCompaction
策略模拟了 RocksDB 的 Universal Compaction Style,其核心思想是:
- 目标:维持一个健康的、层级分明的文件结构,避免 Level 0 文件堆积过多,同时控制写放大和空间放大。
- 触发条件:
- Sorted Run 数量阈值:当总的
SortedRun
数量(Level 0 每个文件算一个 run,其他 level 每层算一个 run)超过num-sorted-run.compaction-trigger
配置时,会触发合并。这是最主要的触发条件。 - 空间放大阈值:当所有文件的总大小,远大于最高层级文件的大小时(超过
max-size-amplification-percent
配置的比例),也会触发合并,以回收空间。
- Sorted Run 数量阈值:当总的
- 挑选文件的逻辑:
- 它会从 Level 0 开始,向上检查,找到第一个满足合并条件的层级。
- 通常,它会选择将 Level 0 的所有文件合并到 Level 1,或者将 Level
i
的所有文件合并到 Leveli+1
。 - 它会尽量选择相邻的、大小相似的
SortedRun
进行合并,以达到最优效率。
filter
的作用: 在triggerCompaction
方法中,strategy.pick
的结果后面跟了两个.filter(...)
调用。.filter(unit -> unit.files().size() > 0)
: 确保选出的压缩单元不是空的。.filter(unit -> unit.files().size() > 1 || unit.files().get(0).level() != unit.outputLevel())
: 这是一个重要的优化。它排除了“只有一个文件,并且输出层级和当前层级相同”的情况。这种情况意味着文件只是被“升级”(upgrade),而没有实际的合并重写,这通常在MergeTreeCompactTask
内部处理更高效,无需作为一个独立的压缩任务提交。
dropDelete
策略
在确定了要合并的文件后,代码会计算一个 dropDelete
布尔值。
- 含义:
dropDelete
为true
表示在合并重写数据时,可以直接丢弃掉类型为DELETE
的记录。 - 条件:
unit.outputLevel() != 0
: 输出层级不能是 Level 0。因为 Level 0 可能还有其他更老的文件没有参与本次合并,如果丢弃了删除标记,可能会导致本该被删除的数据重新“复活”。unit.outputLevel() >= levels.nonEmptyHighestLevel()
: 输出层级必须是当前数据存在的最高层级或更高。这确保了合并产生的新文件是“最老”的,不存在比它更老的数据了,因此删除标记可以被安全地物理移除。dvMaintainer != null
: 如果表开启了删除向量(Deletion Vectors),则逻辑会有所不同,通常也可以安全地处理删除。
总结
MergeTreeCompactManager
的 triggerCompaction
方法是一个精密的调度器,它通过 levels
对象感知整个表的物理文件状态,并执行以下流程:
- 获取状态:从
levels
中获取所有文件的层级和分布信息(levelSortedRuns
)。 - 策略决策:
- 如果是手动触发的全量合并,则使用
pickFullCompaction
策略,将所有文件打包,目标是合并到最高层。 - 如果是自动触发的常规合并,则使用
UniversalCompaction
等策略,根据SortedRun
数量、空间放大等指标,智能地选择一部分文件进行层级推进式的合并。
- 如果是手动触发的全量合并,则使用
- 任务优化:通过
filter
过滤掉无需执行的或无效的压缩单元。 - 参数计算:根据合并的目标层级和
levels
的整体状态,计算出是否可以安全地在合并中物理删除数据(dropDelete
)。 - 提交执行:最后将包含待合并文件、目标层级、
dropDelete
标志的CompactUnit
封装成MergeTreeCompactTask
,提交到线程池执行。
整个过程完美地体现了 LSM-Tree 架构通过后台合并来平衡写入性能和查询性能的核心思想。
压缩策略
CompactStrategy.pickFullCompaction
- 全量合并策略
这是一个定义在 CompactStrategy
接口中的静态方法,代表了一种最彻底的合并策略。它的目标非常明确:将表(或分区)中的所有数据文件合并成尽可能少的文件,并放置到最高层级(max level)。
a. 源码分析
// ... existing code ...static Optional<CompactUnit> pickFullCompaction(int numLevels,List<LevelSortedRun> runs,@Nullable RecordLevelExpire recordLevelExpire) {int maxLevel = numLevels - 1;if (runs.isEmpty()) {// no sorted run, no need to compactreturn Optional.empty();} else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {if (recordLevelExpire == null) {// only 1 sorted run on the max level and don't check record-expire, no need to// compactreturn Optional.empty();}// pick the files which has expired recordsList<DataFileMeta> filesContainExpireRecords = new ArrayList<>();for (DataFileMeta file : runs.get(0).run().files()) {if (recordLevelExpire.isExpireFile(file)) {filesContainExpireRecords.add(file);}}return Optional.of(CompactUnit.fromFiles(maxLevel, filesContainExpireRecords));} else {return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}
// ... existing code ...
逻辑分解如下:
- 计算最高层级:
maxLevel = numLevels - 1
。 - 处理边界情况:
- 如果
runs
为空(没有任何文件),直接返回Optional.empty()
,无需合并。 - 如果已经只剩下一个
SortedRun
并且它已经在最高层级 (runs.size() == 1 && runs.get(0).level() == maxLevel
),这说明数据已经是最优状态了。- 此时,会检查是否配置了记录级过期 (
recordLevelExpire
)。 - 如果没有配置过期,那么就无需做任何事,返回
Optional.empty()
。 - 如果配置了过期,它会遍历这个
SortedRun
中的所有文件,挑出那些可能包含过期数据的文件(通过recordLevelExpire.isExpireFile(file)
判断),然后只对这些文件进行一次“自我合并”以清理过期数据。
- 此时,会检查是否配置了记录级过期 (
- 如果
- 常规全量合并:
- 如果不满足上述边界情况(比如有多于一个
SortedRun
,或者唯一的SortedRun
不在最高层),则执行标准的全量合并。 - 它会调用
CompactUnit.fromLevelRuns(maxLevel, runs)
,将所有传入的runs
中的文件都包含进来,创建一个CompactUnit
,并指定输出层级为maxLevel
。
- 如果不满足上述边界情况(比如有多于一个
b. 应用场景
- 用户手动触发:最常见的场景是用户通过 Flink SQL
CALL sys.compact(...)
或 Spark ProcedureCALL sys.compact(...)
并指定full
模式来执行。 - 目的:
- 减少小文件:将长时间积累的大量小文件合并成少量大文件,显著提升后续的查询性能。
- 物理删除:全量合并到最高层级是安全地物理删除带有删除标记(DELETE)的数据的唯一时机。
- 数据清理:配合 TTL,清理过期数据。
ForceUpLevel0Compaction
- 强制提升 Level 0 策略
这是一个实现了 CompactStrategy
接口的类,它代表了一种更激进的常规合并策略。它的核心思想是:优先采用通用的 UniversalCompaction
策略;如果通用策略认为无需合并,则强制检查 Level 0 是否有文件,如果有,就将它们全部合并。
a. 源码分析
/** A {@link CompactStrategy} to force compacting level 0 files. */
public class ForceUpLevel0Compaction implements CompactStrategy {private final UniversalCompaction universal;public ForceUpLevel0Compaction(UniversalCompaction universal) {this.universal = universal;}@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {// 1. 首先尝试通用的 UniversalCompaction 策略Optional<CompactUnit> pick = universal.pick(numLevels, runs);if (pick.isPresent()) {// 如果通用策略找到了需要合并的文件,就直接采纳它的决定return pick;}// 2. 如果通用策略认为不需要合并,则执行强制逻辑// 调用 universal.forcePickL0,这个方法会专门检查 L0return universal.forcePickL0(numLevels, runs);}
}
forcePickL0
的逻辑在 UniversalCompaction.java
中:
// ... existing code ...Optional<CompactUnit> forcePickL0(int numLevels, List<LevelSortedRun> runs) {// 收集所有 level 0 的文件int candidateCount = 0;for (int i = candidateCount; i < runs.size(); i++) {if (runs.get(i).level() > 0) {break;}candidateCount++;}// 如果 L0 没有文件,返回空;否则,将所有 L0 文件打包成一个压缩单元return candidateCount == 0? Optional.empty(): Optional.of(pickForSizeRatio(numLevels - 1, runs, candidateCount, true));}
// ... existing code ...
逻辑分解如下:
- 委托与回退(Delegate and Fallback):
ForceUpLevel0Compaction
首先将决策权委托给内部的UniversalCompaction
实例。UniversalCompaction
会根据文件数量、大小比例等常规指标判断是否需要合并。 - 强制检查 L0:如果
UniversalCompaction
的常规检查没有触发合并(返回了Optional.empty()
),ForceUpLevel0Compaction
会执行它的“强制”逻辑:调用universal.forcePickL0
。 forcePickL0
的行为:这个方法很简单,它只看 Level 0。只要 Level 0 存在任何文件,它就会把 Level 0 的所有文件都收集起来,创建一个CompactUnit
,准备将它们向上合并。
b. 应用场景
- Lookup Join 优化:这是此策略最主要的应用场景。当 Paimon 表作为 Flink 的维表进行
lookup join
时,为了保证维表数据的近实时性,我们希望新写入的数据(总是在 Level 0)能尽快地被合并到更高层级,从而对lookup
可见。 lookup.compact-mode = 'radical'
:当用户配置此参数时,系统就会启用ForceUpLevel0Compaction
策略。radical
(激进的)这个词很形象地描述了它的行为:只要有新数据写入(即 L0 有文件),就尽快地、激进地将它合并掉,以牺牲一些合并开销为代价,换取数据可见性的延迟降低。
Paimon 的优化:Lookup 模式下跳过 L0
为了解决上述性能问题,Paimon 在开启了特定优化后(比如配置了 lookup
changelog-producer 或开启了删除向量),其 LocalTableQuery
会采取一种特殊的读取策略:
默认从 Level 1 开始读取数据,直接忽略 Level 0!
这可以在 LocalTableQuery.java
的构造函数中找到证据:
// ... existing code ...this.lookupStoreFactory =LookupStoreFactory.create(
// ... existing code ...new RowCompactedSerializer(keyType).createSliceComparator());if (options.needLookup()) {// 关键点:如果开启了 lookup 优化,起始读取层级设置为 1startLevel = 1;} else {
// ... existing code ...}
// ... existing code ...
options.needLookup()
会在满足某些条件时(如 changelog-producer = 'lookup'
)返回 true
。当它为 true
时,startLevel
被设为 1
,这意味着后续的所有查找操作都将从 Level 1 开始,从而完全避开了低效的 Level 0。
ForceUpLevel0Compaction
的作用:让 L0 数据尽快可见
现在,整个逻辑就闭环了:
- 新数据写入 Paimon 表,生成 L0 文件。
- 高性能的 Lookup Join 读取器为了性能,只看 L1 及以上层级的文件。
- 此时,新写入的 L0 文件对于这个 Lookup Join 来说,就是“不可见”的。
- 为了解决这个问题,必须尽快地将 L0 的文件合并(Compaction)到 L1。
ForceUpLevel0Compaction
策略应运而生。它的行为非常“激进” (radical
):只要发现 L0 有文件,不管满足不满足常规的合并触发条件,都强制发起一次 Compaction,将它们推向 L1。
这样一来,新数据停留在 L0 的时间窗口被大大缩短,从而保证了数据能够近实时地对 Lookup Join 可见。
UniversalCompaction
它是 Paimon 中最核心、最常用的常规压缩策略,其设计思想借鉴了 RocksDB 的 Universal Compaction,旨在平衡写入放大、读取放大和空间放大。
UniversalCompaction
作为一个 CompactStrategy
的实现,它的主要职责是在常规写入流程中,根据一系列预设规则,判断是否需要触发一次合并(Compaction),并挑选出具体要合并哪些文件。
它的核心目标是:
- 控制写入放大(Write Amplification):避免过于频繁地重写数据。
- 维持健康的 LSM-Tree 结构:防止 L0 文件过多,或者文件大小差异过大,从而保证读取性能(Read Amplification)和空间占用(Space Amplification)在一个可接受的范围内。
我们先从它的成员变量入手,这些变量定义了 UniversalCompaction
策略的行为准则。
// ... existing code ...
public class UniversalCompaction implements CompactStrategy {private static final Logger LOG = LoggerFactory.getLogger(UniversalCompaction.class);// 对应 'compaction.max-size-amplification-percent'private final int maxSizeAmp; // 对应 'compaction.sorted-run.size-ratio'private final int sizeRatio;// 对应 'compaction.sorted-run.num-compaction-trigger'private final int numRunCompactionTrigger;// 对应 'compaction.optimization-interval'@Nullable private final Long opCompactionInterval;@Nullable private Long lastOptimizedCompaction;// 对应 'lookup.compact-max-interval'@Nullable private final Integer maxLookupCompactInterval;@Nullable private final AtomicInteger lookupCompactTriggerCount;public UniversalCompaction(int maxSizeAmp,int sizeRatio,int numRunCompactionTrigger,@Nullable Duration opCompactionInterval,@Nullable Integer maxLookupCompactInterval) {this.maxSizeAmp = maxSizeAmp;this.sizeRatio = sizeRatio;this.numRunCompactionTrigger = numRunCompactionTrigger;this.opCompactionInterval =opCompactionInterval == null ? null : opCompactionInterval.toMillis();this.maxLookupCompactInterval = maxLookupCompactInterval;this.lookupCompactTriggerCount =maxLookupCompactInterval == null ? null : new AtomicInteger(0);}
// ... existing code ...
maxSizeAmp
: 最大空间放大百分比。控制除最老的一个 Sorted Run 外,其他所有 Sorted Run 的总大小,不能超过最老的 Sorted Run 大小的maxSizeAmp / 100
倍。这是为了控制空间浪费。sizeRatio
: 大小比例。在挑选文件进行合并时,如果当前候选文件集合的总大小,与下一个 Sorted Run 的大小比例在sizeRatio
之内,就会把下一个 Sorted Run 也纳入本次合并。这是为了倾向于合并大小相近的文件,提高效率。numRunCompactionTrigger
: 文件数量触发器。当总的 Sorted Run 数量超过这个阈值时,会强制触发一次合并。这是最主要的常规合并触发条件。opCompactionInterval
: 优化合并时间间隔。一个可选的配置,用于周期性地触发一次全量合并(将所有文件合并到最高层),以保证读取性能。maxLookupCompactInterval
: Lookup 场景的合并间隔。这是为lookup.compact-mode = 'gentle'
模式设计的。它不要求每次写入都强制合并 L0,而是每隔 N 次pick
调用(即 N 次 compaction 检查)后,如果 L0 还有文件,就强制合并一次。这是一种在性能和数据延迟之间的折中。
核心方法 pick
- 决策中心
pick
方法是整个策略的核心,它按照优先级从高到低的顺序,检查是否满足各种触发条件。只要有一个条件满足,就会生成一个 CompactUnit
并返回,后续的检查就不再进行。
// ... existing code ...@Overridepublic Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {int maxLevel = numLevels - 1;// 优先级 0: 周期性优化合并 (如果配置了)if (opCompactionInterval != null) {if (lastOptimizedCompaction == null|| currentTimeMillis() - lastOptimizedCompaction > opCompactionInterval) {// ...return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));}}// 优先级 1: 检查空间放大 (pickForSizeAmp)CompactUnit unit = pickForSizeAmp(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 优先级 2: 检查大小比例 (pickForSizeRatio)unit = pickForSizeRatio(maxLevel, runs);if (unit != null) {// ...return Optional.of(unit);}// 优先级 3: 检查文件数量 (numRunCompactionTrigger)if (runs.size() > numRunCompactionTrigger) {// ...return Optional.ofNullable(pickForSizeRatio(maxLevel, runs, candidateCount));}// 优先级 4: 检查 Lookup 场景的周期性强制合并 (如果配置了)if (maxLookupCompactInterval != null && lookupCompactTriggerCount != null) {lookupCompactTriggerCount.getAndIncrement();if (lookupCompactTriggerCount.compareAndSet(maxLookupCompactInterval, 0)) {// ...return forcePickL0(numLevels, runs);} // ...}return Optional.empty();}
// ... existing code ...
触发条件分析 (按优先级)
周期性优化合并 (
opCompactionInterval
): 最高优先级。如果配置了该参数,并且距离上次优化合并的时间已经超过了指定间隔,它会直接触发一次全量合并,将所有runs
合并到maxLevel
。这对于需要定期整理数据以保证查询性能的场景非常有用。空间放大 (
pickForSizeAmp
): 检查(所有文件总大小 - 最老文件大小) / 最老文件大小
是否超过了maxSizeAmp
。如果超过,说明非最高层的数据(即“增量”数据)相对于“存量”数据来说过于庞大,占用了过多额外空间。此时也会触发一次全量合并。大小比例 (
pickForSizeRatio
): 这是最常规的合并挑选逻辑。它会从最年轻的文件(runs
列表的开头)开始,逐个向后累加,只要当前累加的总大小与下一个文件的大小比例在sizeRatio
之内,就继续向后吞并。这个过程会形成一个大小比较均匀的合并候选集。文件数量 (
numRunCompactionTrigger
): 如果总的SortedRun
数量超过了阈值,说明文件过于碎片化,会严重影响性能。此时会强制触发一次合并。它会计算出需要合并掉多少个文件才能使总数降到阈值以下,然后调用pickForSizeRatio
来挑选这些文件。Lookup 周期性合并 (
maxLookupCompactInterval
): 这是最低优先级的检查。它维护一个原子计数器lookupCompactTriggerCount
。每次调用pick
都会使其加一。当计数器达到maxLookupCompactInterval
时,就会重置为 0 并调用forcePickL0
,强制合并 L0 的所有文件。这为lookup
场景提供了一种“温和”(gentle
)的合并模式。
pickForSizeRatio
public CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs, int candidateCount, boolean forcePick) {long candidateSize = candidateSize(runs, candidateCount);for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++;}if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount);}return null;}
这个方法是 Paimon 中 Universal Compaction(通用合并)策略的核心部分之一,它根据文件大小比例来决定哪些文件(SortedRun
)应该被合并。
int maxLevel
: Merge-Tree 的最大层级数。合并后的文件通常会被放到更高的层级。List<LevelSortedRun> runs
: 当前所有层级的、已排序的SortedRun
列表。这个列表通常是按从新到旧(或从小到大)的顺序排列的。int candidateCount
: 初始候选SortedRun
的数量。方法会从列表的前candidateCount
个run
开始考虑合并。boolean forcePick
: 是否强制选择。如果为true
,即使最终只选出了一个run
,也会创建合并单元(CompactUnit
)。这在某些强制合并的场景下很有用。
这个方法的主要逻辑是:从一组初始的候选文件(runs
)开始,不断地尝试将后续的文件也加入到这次合并任务中,直到遇到一个尺寸“过大”的文件为止。
代码执行流程如下:
计算初始候选文件总大小
long candidateSize = candidateSize(runs, candidateCount);
首先,它会计算由
candidateCount
指定的初始候选run
的总大小。迭代选择更多文件
for (int i = candidateCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) {break;}candidateSize += next.run().totalSize();candidateCount++; }
接着,它会遍历剩余的
run
。对于每一个next
run,它会检查一个关键条件:candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()
sizeRatio
是一个配置项 (compaction.size-ratio
),表示尺寸比较时的灵活性百分比。- 这个条件判断的是:当前已选中的文件总大小(
candidateSize
),即使加上sizeRatio
百分比的“宽容度”,是否仍然小于下一个文件(next
)的大小。 - 如果条件成立,意味着下一个文件
next
比当前已选中的文件总和要大得多,将它们合并的性价比不高。因此,循环中断(break
),不再选择更多的文件。 - 如果条件不成立,意味着
next
文件的大小和当前已选中的文件总大小在同一个量级,适合一起合并。于是,将next
文件加入候选集,更新candidateSize
和candidateCount
。
创建合并单元
if (forcePick || candidateCount > 1) {return createUnit(runs, maxLevel, candidateCount); }return null;
循环结束后,方法会判断是否需要创建合并任务 (
CompactUnit
)。- 如果
forcePick
为true
,或者最终选出的文件数量candidateCount
大于1(合并至少需要两个文件才有意义),就会调用createUnit
方法来创建一个包含所有选中文件的CompactUnit
。 - 否则,说明没有找到合适的合并机会,返回
null
。
- 如果
在 UniversalCompaction
类中,这个方法主要在以下几个场景被调用:
- 常规大小比例检查 (
pickForSizeRatio(maxLevel, runs)
): 这是最常见的用法,从第一个run
开始(candidateCount=1
),尝试寻找合适的合并机会。 - 文件数量触发 (
pick
方法中): 当总文件数超过阈值numRunCompactionTrigger
时,会触发合并。此时,初始candidateCount
会被设置为runs.size() - numRunCompactionTrigger + 1
,然后调用此方法来决定最终合并哪些文件。 - 强制L0层合并 (
forcePickL0
方法中): 在某些情况下(例如,为了降低查找延迟),需要强制合并L0层的所有文件。这时会调用此方法,并将forcePick
设置为true
,确保即使L0只有一个文件也会被打包成一个合并任务。
pickForSizeRatio
方法实现了一种智能的合并文件选择策略。它倾向于将大小相近的文件进行合并,避免了将一个很小的文件和一个巨大的文件进行合并所带来的高I/O开销,从而提高了合并效率。通过 sizeRatio
参数提供了灵活性,并通过 forcePick
参数支持了强制合并的场景。
createUnit()
方法:目标层级的决定者
createUnit
方法是真正计算 outputLevel
的地方,这是理解整个机制的关键。
// ... existing code ...@VisibleForTestingCompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) {int outputLevel;if (runCount == runs.size()) {// 如果所有 run 都参与合并,目标就是最高层outputLevel = maxLevel;} else {// 否则,目标层级是下一个未参与合并的 run 的层级减 1// 这是最核心的逻辑outputLevel = Math.max(0, runs.get(runCount).level() - 1);}if (outputLevel == 0) {// 为了避免产生新的 level 0 文件,这里做了特殊处理// 如果计算出的目标是 level 0,会继续往后寻找,直到找到一个非 level 0 的 run// 并将它的层级作为 outputLevelfor (int i = runCount; i < runs.size(); i++) {LevelSortedRun next = runs.get(i);runCount++;if (next.level() != 0) {outputLevel = next.level();break;}}}if (runCount == runs.size()) {// 如果经过上述逻辑后,所有 run 都被选中了,那么还是合并到最高层updateLastOptimizedCompaction();outputLevel = maxLevel;}return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount));}
// ... existing code ...
核心逻辑解读:
runCount
是被选中参与本次合并的 run 的数量。outputLevel = Math.max(0, runs.get(runCount).level() - 1);
这一行代码是关键。它说明,当不是所有文件都参与合并时,输出层级取决于第一个未被选中的 run (runs.get(runCount)
)。目标层级是这个 run 的层级减 1。
举个例子: 假设我们有以下 runs
: [L0, L0, L0, L2, L4]
pickForSizeRatio
方法可能决定合并前3个 L0 的文件。此时runCount
= 3。- 进入
createUnit
方法,runs.get(runCount)
就是runs.get(3)
,即那个 L2 的 run。 outputLevel
计算为L2.level() - 1
,也就是2 - 1 = 1
。- 最终,这3个 L0 文件会被合并成一个新的 L1 文件,而不是
maxLevel
。
辅助方法
pickForSizeRatio(...)
: 实现了上述的大小比例挑选逻辑。它会从最年轻的文件开始,向后“滚动”合并,直到下一个文件太大不适合合并为止。createUnit(...)
: 根据挑选出的文件(runCount
个),决定输出层级(outputLevel
)。逻辑通常是:如果合并了所有文件,则输出到最高层;否则,输出到下一个未被合并的文件的层级减一。它还会保证输出层级不为 0。forcePickL0(...)
: 一个特殊的方法,只收集 Level 0 的所有文件,并准备将它们合并。被ForceUpLevel0Compaction
和lookupCompactMaxInterval
逻辑所调用。
总结
UniversalCompaction
是一个设计精巧且全面的合并策略,它通过多维度、分优先级的规则来维护 LSM-Tree 的健康状态:
- 通过文件数量和空间放大来设定合并的“底线”,防止系统状态恶化。
- 通过大小比例来智能地挑选合并单元,实现高效合并。
- 通过周期性全量合并和针对 Lookup 的周期性 L0 合并,为不同的应用场景提供了额外的优化选项。
它与 ForceUpLevel0Compaction
的关系是:ForceUpLevel0Compaction
是一种更激进的策略,它完全复用了 UniversalCompaction
的所有逻辑,并在此基础上增加了一个最终的回退(fallback)逻辑:如果 UniversalCompaction
认为无需合并,它会强制检查并合并 L0,以满足 Lookup 场景对数据实时性的极致要求。
CompactUnit
Paimon 中执行一次具体压缩操作的对象是 CompactUnit
,它并不完全等同于一个 SortedRun
,而是从一个或多个 SortedRun
中选取出来的文件集合。定位哪些文件需要压缩,则是由 CompactStrategy
(压缩策略)来决encided。
一个压缩任务的基本工作单元是 CompactUnit
接口。从它的定义可以看出,一个 CompactUnit
主要包含两部分信息:
outputLevel()
: 压缩后生成的新文件要归属的层级(Level)。files()
: 需要参与本次压缩的所有数据文件(DataFileMeta
)的列表。
public interface CompactUnit {int outputLevel();List<DataFileMeta> files();static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {List<DataFileMeta> files = new ArrayList<>();for (LevelSortedRun run : runs) {files.addAll(run.run().files());}return fromFiles(outputLevel, files);}static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {return new CompactUnit() {@Overridepublic int outputLevel() {return outputLevel;}@Overridepublic List<DataFileMeta> files() {return files;}};}
}
所以,压缩对象是一组文件的集合(List<DataFileMeta>
),而不是单个 SortedRun
。一个 CompactUnit
可以包含来自不同 SortedRun
的文件。
SortedRun
是 LSM-Tree 里的一个逻辑概念,代表一个有序的数据文件集合。在 Paimon 中,Level 0 每个文件都是一个独立的 SortedRun
,而在其他 Level,同一层的所有文件构成一个 SortedRun
。
SortedRun
是挑选压缩文件时的重要依据,但不是压缩任务的直接执行对象。压缩策略会分析各个 SortedRun
的状态(如数量、大小、层级)来决定是否要发起一次压缩,以及挑选哪些 SortedRun
里的文件来组成 CompactUnit
。
如何定位和组织压缩对象?
这个过程主要分为两步:策略选择 和 任务执行。
第一步:CompactStrategy
策略选择
CompactStrategy
接口负责从当前所有 LevelSortedRun
(带有层级信息的 SortedRun
)中挑选出需要被压缩的 CompactUnit
。
// ... existing code ...
public interface CompactStrategy {/*** Pick compaction unit from runs.** <ul>* <li>compaction is runs-based, not file-based.* <li>level 0 is special, one run per file; all other levels are one run per level.* <li>compaction is sequential from small level to large level.* </ul>*/Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);/** Pick a compaction unit consisting of all existing files. */
// ... existing code ...
例如,UniversalCompaction
策略会检查是否有太多 SortedRun
,如果超过阈值,就会选择一些相邻的 SortedRun
合并。FullCompaction
策略则会选择所有的文件进行合并。
第二步:MergeTreeCompactTask
任务执行
一旦 CompactStrategy
挑选出了一个 CompactUnit
,就会创建一个 MergeTreeCompactTask
来执行具体的压缩逻辑。在你正在查看的 MergeTreeCompactTask.java
文件中,doCompact()
方法清晰地展示了如何处理这个 CompactUnit
里的文件。
- 分区(Partitioning):任务首先会根据文件的 key 范围将
CompactUnit
中的文件分成若干个互不重叠或连续重叠的组(List<List<SortedRun>> partitioned
)。 - 分类处理:
- 对于有多个
SortedRun
重叠的组,它们是必须被重写(rewrite)合并的,因此被加入candidate
列表。 - 对于没有重叠的组(只有一个
SortedRun
),会进一步判断:- 如果文件很小(小于
minFileSize
),它也会被加入candidate
列表,以便和其它小文件一起合并,避免产生过多小文件。 - 如果文件很大,通常会直接“升级”(upgrade),即只修改文件的元数据,将其层级提升到
outputLevel
,而无需重写文件内容,这样效率更高。但如果这个大文件包含需要过期的数据,或者需要被合并到最高层级但自身带有删除记录,它还是会被强制重写。
- 如果文件很小(小于
- 对于有多个
// ... existing code ...@Overrideprotected CompactResult doCompact() throws Exception {List<List<SortedRun>> candidate = new ArrayList<>();CompactResult result = new CompactResult();// Checking the order and compacting adjacent and contiguous files// Note: can't skip an intermediate file to compact, this will destroy the overall// orderlinessfor (List<SortedRun> section : partitioned) {if (section.size() > 1) {candidate.add(section);} else {SortedRun run = section.get(0);// No overlapping:// We can just upgrade the large file and just change the level instead of// rewriting it// But for small files, we will try to compact itfor (DataFileMeta file : run.files()) {if (file.fileSize() < minFileSize) {// Smaller files are rewritten along with the previous filescandidate.add(singletonList(SortedRun.fromSingle(file)));} else {// Large file appear, rewrite previous and upgrade itrewrite(candidate, result);upgrade(file, result);}}}}rewrite(candidate, result);result.setDeletionFile(compactDfSupplier.get());return result;}
// ... existing code ...
总结
- 压缩对象:一个
CompactUnit
实例,它内部封装了一组待压缩的文件列表 (List<DataFileMeta>
) 和目标层级。 - 与
SortedRun
的关系:SortedRun
是 LSM-Tree 的逻辑层,是压缩策略(CompactStrategy
)制定计划的输入和依据。策略根据SortedRun
的情况来决定挑选哪些文件组成CompactUnit
。 - 定位方式:通过
CompactStrategy.pick()
方法,根据预设的压缩策略(如 Universal, Full 等)分析所有SortedRun
,挑选出最需要被压缩的文件,打包成CompactUnit
,然后交由MergeTreeCompactTask
执行。
level层次数量的权衡
这是一个关于性能权衡(Trade-off)的经典问题,主要涉及 写放大(Write Amplification) 和 读放大(Read Amplification)。
降低写放大(The Main Benefit)是分更多层级的最主要原因。
场景A:只有两层(L0 和 L-max) 假设我们只有 L0 和 L1(作为最高层)。当 L0 的文件需要合并时,它们需要和 L1 中所有的数据进行合并,然后生成全新的 L1。如果 L1 非常大(比如几百GB),那么即使 L0 只有很小的一点新数据(比如几百MB),也需要重写整个 L1。这个过程消耗的 I/O 非常巨大,这就是“写放大”——为了写入少量逻辑数据,却需要进行大量的物理磁盘写入。
场景B:有多层(L0, L1, L2, ...) 在这种设计下,合并是逐步进行的。L0 的文件合并成 L1 的文件;当 L1 积累到一定程度,再和 L2 合并,以此类推。每次合并操作所涉及的数据量都相对较小。例如,将几个100MB的 L0 文件合并成一个 L1 文件,远比将它们与一个100GB的 L-max 文件合并要快得多。这大大降低了单次合并的成本,使得写入性能更平滑、更可预测。
平衡读写性能
更多层级:
- 优点:写放大低,写入平稳。
- 缺点:读放大高。因为一条数据可能存在于任何一层,查询时需要从 L0 到 L-max 逐层查找,直到找到最新的版本。层级越多,需要检查的地方就越多。
更少层级:
- 优点:读放大低。查询时只需要检查很少的几个层级。
- 缺点:写放大高。合并成本巨大,可能导致写入性能的剧烈抖动。
Paimon 采用的 Universal Compaction 策略,以及其多层级的结构,正是在这两种放大效应之间做权衡,目标是提供一个整体表现良好的方案,尤其是在高频写入的场景下,通过平滑合并操作来保证写入的稳定性。
总而言之,分更多层次的核心好处是显著降低写放大,以平滑的、低成本的增量合并,替代高成本的全量合并,从而获得更稳定、高效的写入性能。这是以牺牲一部分读取性能(增加读放大)为代价的,但对于许多数据仓库和数据湖场景来说,这是一个非常值得的权衡。
AbstractCompactRewriter
AbstractCompactRewriter
是 Apache Paimon 项目中用于合并树(Merge-Tree)压缩(Compaction)操作的一个核心抽象基类。它定义了压缩重写器(Compact Rewriter)的通用行为和辅助功能,具体的重写逻辑则由其子类实现。
该类位于 org.apache.paimon.mergetree.compact
包下,从其命名和 abstract
关键字可以看出,它是一个抽象类,不能被直接实例化。它实现了 CompactRewriter
接口。
/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.mergetree.SortedRun;import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;/** Common implementation of {@link CompactRewriter}. */
public abstract class AbstractCompactRewriter implements CompactRewriter {@Overridepublic CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception {return new CompactResult(file, file.upgrade(outputLevel));}protected static List<DataFileMeta> extractFilesFromSections(List<List<SortedRun>> sections) {return sections.stream().flatMap(Collection::stream).map(SortedRun::files).flatMap(Collection::stream).collect(Collectors.toList());}@Overridepublic void close() throws IOException {}
}
它实现了 CompactRewriter
接口,我们先来看一下这个接口的定义:
// ... existing code ...
import java.io.Closeable;
import java.util.List;public interface CompactRewriter extends Closeable {/*** Rewrite sections to new level.* ...*/CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception;/*** Upgrade file to new level, usually file data is not rewritten, only the metadata is updated.* ...*/CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
}
CompactRewriter
接口定义了两个核心方法 rewrite
和 upgrade
,并且继承了 Closeable
接口,意味着实现类可能持有需要关闭的资源。
AbstractCompactRewriter
提供了部分方法的默认实现,并留下一个抽象方法给子类。
upgrade(int outputLevel, DataFileMeta file)
这个方法提供了 upgrade
操作的默认实现。在 Paimon 的合并树中,upgrade
通常指将一个文件从较低的 level "提升" 到较高的 level,这个过程大多数情况下不需要重写文件内容,只需要更新文件的元数据(比如 level 信息)。 这里的实现正是如此:
- 它接收一个
DataFileMeta
对象(代表一个数据文件)和一个目标outputLevel
。 - 它调用
file.upgrade(outputLevel)
创建一个新的DataFileMeta
对象,新对象包含了更新后的 level 信息。 - 最后,它将原始文件和升级后的文件包装成一个
CompactResult
对象返回。CompactResult
用来封装压缩的结果,包含压缩前的文件列表和压缩后的文件列表。
extractFilesFromSections(List<List<SortedRun>> sections)
这是一个 protected static
的辅助方法,不属于接口方法。它的作用是从一个嵌套的 List<List<SortedRun>>
结构中提取出所有底层的 DataFileMeta
文件。
sections
代表了待压缩的多个数据段。- 每个
section
是一个List<SortedRun>
。 - 每个
SortedRun
包含一个或多个数据文件 (DataFileMeta
)。 这个方法通过 Java Stream API 将这个三层嵌套的结构展平,最终返回一个包含所有待压缩文件的List<DataFileMeta>
。这在rewrite
的实现中非常有用,可以方便地获取所有输入文件。
close()
该方法提供了 Closeable
接口的默认空实现。这意味着如果子类没有需要释放的资源(如文件句柄、网络连接等),就无需重写此方法。如果子类持有了需要关闭的资源,则必须重写此方法以确保资源被正确释放。
rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)
AbstractCompactRewriter
没有实现 CompactRewriter
接口中的 rewrite
方法。这正是它被声明为 abstract
的原因。 rewrite
方法是压缩操作的核心,它定义了如何读取多个输入文件(sections
),通过合并、去重等逻辑,然后写入到一个或多个新的输出文件(在 outputLevel
)。这个过程是复杂且多变的,取决于具体的合并策略(例如,是否保留 changelog、使用哪种合并引擎等)。 因此,AbstractCompactRewriter
将这个最核心、最易变的逻辑交由具体的子类去实现。
实现与应用
在 Paimon 项目中,有多个 AbstractCompactRewriter
的子类,它们根据不同的场景提供了具体的 rewrite
实现。例如:
MergeTreeCompactRewriter
: 提供了通用的合并树压缩逻辑。ChangelogMergeTreeRewriter
: 在MergeTreeCompactRewriter
基础上,增加了生成 changelog 的能力。FullChangelogMergeTreeCompactRewriter
: 一种生成完整 changelog 的策略。LookupMergeTreeCompactRewriter
: 适用于lookup
模式的合并策略。
这些子类会重写 rewrite
方法,并在内部使用 extractFilesFromSections
辅助方法来获取输入文件列表。
例如,在一个测试类 MergeTreeTestBase
中,就有一个内部类 TestRewriter
继承了 AbstractCompactRewriter
并实现了 rewrite
方法,清晰地展示了其用法:
MergeTreeTestBase.java
Apply
// ... existing code ...private class TestRewriter extends AbstractCompactRewriter {@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections)throws Exception {// 1. 创建用于写入新文件的 writerRollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);// 2. 创建用于读取和合并旧文件的 readerRecordReader<KeyValue> reader =MergeTreeReaders.readerForMergeTree(sections,// ... reader and merge function details ...);if (dropDelete) {reader = new DropDeleteReader(reader);}// 3. 执行读写操作writer.write(new RecordReaderIterator<>(reader));writer.close();// 4. 使用父类的辅助方法获取输入文件,并返回结果return new CompactResult(extractFilesFromSections(sections), writer.result());}}
// ... existing code ...
AbstractCompactRewriter
是一个典型的模板方法模式的应用。它定义了压缩重写算法的骨架:
- 提供了不变的部分:
upgrade
方法的通用实现和close
的空实现。 - 提供了可复用的辅助功能:
extractFilesFromSections
静态方法。 - 定义了易变的部分为抽象方法:将核心的
rewrite
逻辑延迟到子类中实现。
这种设计使得 Paimon 可以灵活地扩展和实现不同场景下的文件压缩策略,同时保证了代码的复用性和结构的清晰性。
MergeTreeCompactRewriter
MergeTreeCompactRewriter
是 AbstractCompactRewriter
的一个核心具体实现。在 Paimon 的合并树(Merge-Tree)结构中,当需要对数据文件进行合并(Compaction)时,这个类提供了默认的、最基础的重写(rewrite)逻辑。
// ... existing code ...
import java.util.Comparator;
import java.util.List;/** Default {@link CompactRewriter} for merge trees. */
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
// ... existing code ...
从定义可以看出:
- 它是一个公开类,位于
org.apache.paimon.mergetree.compact
包下。 - 它继承自我们之前分析过的
AbstractCompactRewriter
。这意味着它自动获得了upgrade
方法的默认实现(仅更新元数据)和extractFilesFromSections
辅助方法。 - 它的核心职责是实现
AbstractCompactRewriter
中定义的抽象方法rewrite
。
核心成员变量与构造函数
MergeTreeCompactRewriter
的行为由其成员变量(在构造时注入)决定,这体现了依赖注入的设计思想,使得类更加灵活和可测试。
// ... existing code ...
public class MergeTreeCompactRewriter extends AbstractCompactRewriter {protected final FileReaderFactory<KeyValue> readerFactory;protected final KeyValueFileWriterFactory writerFactory;protected final Comparator<InternalRow> keyComparator;@Nullable protected final FieldsComparator userDefinedSeqComparator;protected final MergeFunctionFactory<KeyValue> mfFactory;protected final MergeSorter mergeSorter;public MergeTreeCompactRewriter(FileReaderFactory<KeyValue> readerFactory,KeyValueFileWriterFactory writerFactory,Comparator<InternalRow> keyComparator,@Nullable FieldsComparator userDefinedSeqComparator,MergeFunctionFactory<KeyValue> mfFactory,MergeSorter mergeSorter) {this.readerFactory = readerFactory;this.writerFactory = writerFactory;this.keyComparator = keyComparator;this.userDefinedSeqComparator = userDefinedSeqComparator;this.mfFactory = mfFactory;this.mergeSorter = mergeSorter;}
// ... existing code ...
readerFactory
: 文件读取器工厂,用于创建读取输入数据文件(SortedRun
中的文件)的RecordReader
。writerFactory
: 文件写入器工厂,用于创建RollingFileWriter
,将合并后的数据写入新的输出文件。keyComparator
: 键比较器,用于在合并过程中对KeyValue
的键(key)进行排序和比较。userDefinedSeqComparator
: 用户定义的序列号比较器(可选)。Paimon 支持用户自定义字段作为合并时的排序依据,这个比较器就用于此目的。mfFactory
: 合并函数工厂(MergeFunctionFactory
),用于创建MergeFunction
。MergeFunction
定义了当遇到相同键(key)的多条记录时,应如何合并它们。例如,可以是去重(Deduplicate)、部分更新(Partial Update)或聚合(Aggregation)等。mergeSorter
: 合并排序器,当待合并的数据量过大,无法完全在内存中进行时,MergeSorter
会利用外部排序(External Sort)来处理。
rewrite
方法
这是 CompactRewriter
接口的核心方法。MergeTreeCompactRewriter
的实现非常直接,它将调用转发给了 rewriteCompaction
方法。这种设计模式允许子类在不改变公共API的情况下,更容易地重写或扩展核心压缩逻辑。
// ... existing code ...@Overridepublic CompactResult rewrite(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {return rewriteCompaction(outputLevel, dropDelete, sections);}
// ... existing code ...
rewriteCompaction
方法
这是实际执行压缩重写逻辑的地方,是整个类的核心。
// ... existing code ...protected CompactResult rewriteCompaction(int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) throws Exception {// 1. 创建一个滚动写入器,用于写入压缩后的新文件RollingFileWriter<KeyValue, DataFileMeta> writer =writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT);RecordReader<KeyValue> reader = null;Exception collectedExceptions = null;try {// 2. 创建一个合并读取器,它会读取所有输入文件,并应用合并逻辑reader =readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create()));// 3. 如果需要,包装一个 DropDeleteReader 来过滤掉删除标记的记录if (dropDelete) {reader = new DropDeleteReader(reader);}// 4. 执行读写:从 reader 读取合并后的数据,写入 writerwriter.write(new RecordReaderIterator<>(reader));} catch (Exception e) {collectedExceptions = e;} finally {// 5. 确保资源被关闭try {IOUtils.closeAll(reader, writer);} catch (Exception e) {collectedExceptions = ExceptionUtils.firstOrSuppressed(e, collectedExceptions);}}if (null != collectedExceptions) {// 6. 如果发生异常,终止写入并抛出异常writer.abort();throw collectedExceptions;}// 7. 构造并返回 CompactResultList<DataFileMeta> before = extractFilesFromSections(sections);notifyRewriteCompactBefore(before);return new CompactResult(before, writer.result());}
// ... existing code ...
其执行流程可以分解为:
- 创建写入器:通过
writerFactory
创建一个RollingFileWriter
,准备将合并后的数据写入到指定outputLevel
的新文件中。 - 创建读取器:调用
readerForMergeTree
方法创建一个RecordReader
。这个 reader 是一个复合的 reader,它能够同时读取多个SortedRun
中的所有文件,并使用MergeFunction
在内存中或通过外部排序进行多路归并。 - 处理删除标记:如果
dropDelete
参数为true
(通常在对最高 level 进行 full compaction 时),则用DropDeleteReader
包装原始 reader。这个装饰器会过滤掉类型为DELETE
的记录。 - 执行重写:
writer.write(new RecordReaderIterator<>(reader))
是核心步骤。它驱动reader
读取、合并数据,并将结果逐条写入writer
。 - 资源管理与异常处理:使用
try-finally
确保reader
和writer
无论成功还是失败都能被关闭。如果过程中发生异常,会收集起来。 - 失败回滚:如果收集到异常,调用
writer.abort()
来删除可能已产生的临时文件,然后将异常抛出。 - 返回结果:如果成功,它会调用父类的
extractFilesFromSections
获取所有输入文件,并结合writer.result()
(所有新生成的输出文件),封装成一个CompactResult
对象返回。在返回前,会调用一个空的notifyRewriteCompactBefore
方法,这是一个为子类提供的扩展点。
readerForMergeTree
和 notifyRewriteCompactBefore
readerForMergeTree
: 这是一个辅助方法,它封装了创建合并树读取器的复杂性,将所有需要的组件(readerFactory
,keyComparator
等)传递给MergeTreeReaders.readerForMergeTree
来构造最终的RecordReader
。notifyRewriteCompactBefore
: 这是一个空的protected
方法,充当一个钩子(Hook)。子类可以重写此方法来在压缩完成、返回结果之前执行一些额外的逻辑。例如,LookupMergeTreeCompactRewriter
就重写了此方法来清理与被压缩文件相关的删除向量(Deletion Vectors)。
在系统中的位置和扩展
MergeTreeCompactRewriter
是 Paimon 压缩机制的基石。MergeTreeCompactManager
负责制定压缩计划,当它决定执行一个压缩任务(MergeTreeCompactTask
)时,就会使用一个CompactRewriter
的实例来执行实际的文件读写和合并。- 这个类是可扩展的。Paimon 中更复杂的 Rewriter,如
ChangelogMergeTreeRewriter
(用于生成 changelog)、LookupMergeTreeCompactRewriter
(用于 lookup 表)和FullChangelogMergeTreeCompactRewriter
,都直接或间接地继承自MergeTreeCompactRewriter
,并在其基础上增加或修改功能。
总结
MergeTreeCompactRewriter
是 Paimon 合并树 compaction 逻辑的一个标准、通用的实现。它清晰地展示了如何将多个已排序的文件(SortedRun
)通过多路归并、应用合并函数,最终重写为新的、更紧凑的文件。其设计利用了依赖注入和模板方法/钩子模式,具有良好的灵活性和扩展性,为更高级的压缩策略提供了坚实的基础。
为了更清晰地理解,梳理从顶层到底层的核心调用链:
MergeTreeCompactRewriter.readerForMergeTree
MergeTreeReaders.readerForMergeTree
- 调用 ->
ConcatRecordReader.create
将多个section
的 reader 串联起来。 - 为每个 section 调用 ->
MergeTreeReaders.readerForSection
- 调用 ->
MergeTreeReaders.readerForSection
- 调用 ->
MergeSorter.mergeSort
对一个section
内的所有SortedRun
进行合并排序。 - 为每个 SortedRun 调用 ->
MergeTreeReaders.readerForRun
- 调用 ->
MergeTreeReaders.readerForRun
- 调用 ->
ConcatRecordReader.create
将一个SortedRun
内的所有文件串联起来。
- 调用 ->
MergeSorter.mergeSort
- 调用 ->
SortMergeReader.createSortMergeReader
(当数据无需溢出到磁盘时)
- 调用 ->
SortMergeReader.createSortMergeReader
- 创建 ->
SortMergeReaderWithMinHeap
或SortMergeReaderWithLoserTree
(最终执行多路归并的 Reader)
- 创建 ->
SortMergeReaderWithMinHeap
和 SortMergeReaderWithLoserTree
是两种经典的多路归并排序算法的实现。它们接收多个已经排好序的 RecordReader
作为输入,通过内部的数据结构(最小堆或败者树)高效地从所有输入中找出当前最小的记录。当遇到主键相同的记录时,它们会调用 MergeFunctionWrapper
来执行用户定义的合并逻辑(如去重、更新等)。
IntervalPartition
IntervalPartition
是 Paimon 在执行 Merge-Tree 压缩(Compaction)时一个非常核心的工具类。它的主要目标是将一组给定的数据文件(DataFileMeta
)进行高效地分组,以便后续进行归并排序。这个分组算法非常关键,因为它直接影响了压缩任务的并行度和效率。
从类的注释中我们可以看到,它的目的是:
Algorithm to partition several data files into the minimum number of {@link SortedRun}s.
即:将多个数据文件划分为最少数目的 SortedRun
的算法。
这里的 SortedRun
代表一组按主键有序且键范围互不重叠的文件集合。将文件划分为最少的 SortedRun
意味着我们可以用最少的归并路数来完成排序,从而提高效率。
为了实现这个目标,IntervalPartition
采用了两层划分的策略:
第一层:Section(分段)
- 它首先将所有输入文件按照主键范围(Key Interval)划分为若干个 Section。
- 不同 Section 之间的主键范围是完全不重叠的。
- 这样做的好处是,不同 Section 之间的数据没有交集,因此可以独立、并行地进行处理,而不会相互影响。这极大地提高了压缩的并行能力。
第二层:SortedRun(有序运行)
- 在每个 Section 内部,文件的主键范围是可能相互重叠的。
- 算法的目标是在这个 Section 内,将这些可能重叠的文件,组合成最少数目的
SortedRun
。 - 每个
SortedRun
内部的文件,按主键有序排列,并且它们的主键范围不会重叠。
最终,IntervalPartition
的输出是一个二维列表 List<List<SortedRun>>
,外层列表代表 Section,内层列表代表该 Section 内划分出的所有 SortedRun
。
构造函数与初始化
// ... existing code ...public IntervalPartition(List<DataFileMeta> inputFiles, Comparator<InternalRow> keyComparator) {this.files = new ArrayList<>(inputFiles);this.files.sort((o1, o2) -> {int leftResult = keyComparator.compare(o1.minKey(), o2.minKey());return leftResult == 0? keyComparator.compare(o1.maxKey(), o2.maxKey()): leftResult;});this.keyComparator = keyComparator;}
// ... existing code ...
构造函数做了非常重要的一步预处理: 对所有输入的 DataFileMeta
文件,首先按照它们的 minKey
(最小主键)进行升序排序。如果 minKey
相同,则再按照 maxKey
(最大主键)进行升序排序。
这个排序是后续所有划分算法的基础,保证了文件是按照主键范围的起始位置被依次处理的。
partition()
方法:第一层划分(切分 Section)
这是该类的入口方法,负责将已排序的文件切分成多个 Section。
// ... existing code ...public List<List<SortedRun>> partition() {List<List<SortedRun>> result = new ArrayList<>();List<DataFileMeta> section = new ArrayList<>();BinaryRow bound = null;for (DataFileMeta meta : files) {if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {// larger than current right bound, conclude current section and create a new oneresult.add(partition(section));section.clear();bound = null;}section.add(meta);if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {// update right boundbound = meta.maxKey();}}if (!section.isEmpty()) {// conclude last sectionresult.add(partition(section));}return result;}
// ... existing code ...
其逻辑如下:
- 维护一个当前
section
的文件列表和一个当前section
的右边界bound
(即section
中所有文件的maxKey
的最大值)。 - 遍历所有(已按
minKey
排序的)文件:- 将当前文件
meta
加入section
。 - 更新
bound
为当前section
中所有文件maxKey
的最大值。 - 核心判断:在添加下一个文件之前,检查它的
minKey
是否大于当前section
的右边界bound
。 - 如果
meta.minKey() > bound
,说明这个新文件与当前section
内的所有文件都没有主键范围的重叠。这意味着一个 Section 到此结束了。 - 此时,就将当前
section
里的文件列表交给partition(section)
方法(第二层划分)处理,得到List<SortedRun>
,然后加入最终结果result
。 - 清空
section
和bound
,开始一个新的 Section。
- 将当前文件
- 循环结束后,处理最后一个
section
。
通过这种方式,它有效地将所有文件切分成了若干个主键范围互不重叠的 Section。
partition(List<DataFileMeta> metas)
方法:第二层划分(贪心算法生成 SortedRun)
这个私有方法是算法的精髓所在。它接收一个 Section 内的文件列表(这些文件的主键范围可能重叠),目标是将它们划分为最少的 SortedRun
。
这里使用了一个经典的贪心算法,并借助优先队列(最小堆) 来实现。
// ... existing code ...private List<SortedRun> partition(List<DataFileMeta> metas) {PriorityQueue<List<DataFileMeta>> queue =new PriorityQueue<>((o1, o2) ->// sort by max key of the last data filekeyComparator.compare(o1.get(o1.size() - 1).maxKey(),o2.get(o2.size() - 1).maxKey()));// create the initial partitionList<DataFileMeta> firstRun = new ArrayList<>();firstRun.add(metas.get(0));queue.add(firstRun);for (int i = 1; i < metas.size(); i++) {DataFileMeta meta = metas.get(i);// any file list whose max key < meta.minKey() is sufficient,// for convenience we pick the smallestList<DataFileMeta> top = queue.poll();if (keyComparator.compare(meta.minKey(), top.get(top.size() - 1).maxKey()) > 0) {// append current file to an existing partitiontop.add(meta);} else {// create a new partitionList<DataFileMeta> newRun = new ArrayList<>();newRun.add(meta);queue.add(newRun);}queue.add(top);}// order between partitions does not matterreturn queue.stream().map(SortedRun::fromSorted).collect(Collectors.toList());}
}
逻辑分解如下:
- 创建一个优先队列
queue
。队列中的元素是List<DataFileMeta>
,代表一个正在构建中的SortedRun
。 - 这个优先队列的排序规则是:按照每个
SortedRun
中最后一个文件的maxKey
进行升序排序。这意味着,队首的SortedRun
是当前所有SortedRun
中,右边界最小的那个。 - 将 Section 中的第一个文件放入一个新的
SortedRun
,并加入队列。 - 遍历 Section 中剩余的文件
meta
:- 从优先队列中取出队首元素
top
(即右边界最小的那个SortedRun
)。 - 核心判断:比较当前文件
meta
的minKey
和top
的maxKey
。 - 如果
meta.minKey()
大于top.get(top.size() - 1).maxKey()
,说明meta
文件可以安全地追加到top
这个SortedRun
的末尾,而不会破坏其内部的有序性(因为meta
的范围在top
之后)。于是,将meta
添加到top
中。 - 如果
meta.minKey()
不大于top
的maxKey
,说明meta
与top
这个SortedRun
的范围有重叠,不能追加。此时,必须为meta
创建一个新的SortedRun
,并将这个新的SortedRun
也加入到优先队列中。 - 最后,将被修改过(或者未修改)的
top
重新放回优先队列。
- 从优先队列中取出队首元素
- 遍历结束后,优先队列
queue
中剩下的所有List<DataFileMeta>
就是划分好的、数量最少的SortedRun
集合。
这个贪心策略的正确性在于,对于每一个新来的文件,我们总是尝试将它追加到最早可以结束的那个 SortedRun
后面。这样可以最大限度地复用现有的 SortedRun
,从而保证最终 SortedRun
的数量最少。这在算法上被称为“区间划分问题”的一种经典解法。
总结
IntervalPartition
通过一个两阶段的划分过程,巧妙地解决了如何高效组织待压缩文件的问题:
- 宏观上,通过
partition()
方法按主键范围切分出互不重叠的Section
,为并行处理创造了条件。 - 微观上,在每个
Section
内部,通过partition(List<DataFileMeta> metas)
方法中的贪心算法,将可能重叠的文件高效地组织成最少数目的SortedRun
,为后续的归并排序(Merge-Sort)提供了最优的输入,减少了归并的复杂度。
这个类的设计体现了在数据密集型系统中,通过精巧的算法设计来优化核心流程(如Compaction)的典型思路。
MergeTreeCompactManager
MergeTreeCompactManager
是 Paimon 中 merge-tree
核心写流程的心脏,它负责管理和调度数据文件的合并(Compaction)任务。下面我将从几个关键部分来解析这个类。
MergeTreeCompactManager
继承自 CompactFutureManager
,它的核心职责是:
- 管理数据文件:通过内部的
Levels
对象,维护所有数据文件(DataFileMeta
)在不同层级(Level)的分布情况。 - 触发合并任务:根据预设的合并策略(
CompactStrategy
),决定何时以及哪些文件需要进行合并。 - 提交和管理合并任务:将选出的文件打包成一个合并单元(
CompactUnit
),并提交给一个异步执行的MergeTreeCompactTask
任务。 - 处理合并结果:获取异步任务的执行结果(
CompactResult
),并用它来更新Levels
中文件的状态。 - 控制写入流速:通过判断当前文件堆积情况,决定是否需要阻塞上游的写入(
Write
)操作,防止因合并速度跟不上写入速度而导致系统不稳定。
核心成员变量 (Key Fields)
我们来看一下这个类中最重要的几个成员变量,它们定义了 MergeTreeCompactManager
的行为:
// ... existing code ...
public class MergeTreeCompactManager extends CompactFutureManager {private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class);private final ExecutorService executor;private final Levels levels;private final CompactStrategy strategy;private final Comparator<InternalRow> keyComparator;private final long compactionFileSize;private final int numSortedRunStopTrigger;private final CompactRewriter rewriter;@Nullable private final CompactionMetrics.Reporter metricsReporter;@Nullable private final DeletionVectorsMaintainer dvMaintainer;private final boolean lazyGenDeletionFile;private final boolean needLookup;@Nullable private final RecordLevelExpire recordLevelExpire;
// ... existing code ...
executor
: 一个ExecutorService
线程池,用于异步执行合并任务。levels
:Levels
对象,是 LSM-Tree(Log-Structured Merge-Tree)分层结构的核心体现。它管理着所有的数据文件,并根据 Level 组织它们。strategy
:CompactStrategy
,合并策略。它定义了如何从Levels
中挑选文件进行合并。Paimon 提供了如UniversalCompaction
和LevelCompaction
等策略。keyComparator
: 主键的比较器,用于在合并过程中对数据进行排序。compactionFileSize
: 合并后生成的目标文件大小。numSortedRunStopTrigger
: 一个非常重要的阈值。当Levels
中的有序文件片段(Sorted Run)数量超过这个值时,会阻塞写入操作,等待合并完成。这是控制写入和合并速度平衡的关键。rewriter
:CompactRewriter
,合并重写器。它负责读取待合并的旧文件,使用MergeFunction
对数据进行合并,然后写入新文件。这是实际执行数据合并逻辑的组件。needLookup
: 一个布尔值,通常与changelog-producer
=lookup
配置相关。当为true
时,表示在合并时需要通过 lookup 方式生成 changelog。
triggerCompaction(boolean fullCompaction)
: 触发合并
这是发起合并的入口。
// ... existing code ...@Overridepublic void triggerCompaction(boolean fullCompaction) {Optional<CompactUnit> optionalUnit;List<LevelSortedRun> runs = levels.levelSortedRuns();if (fullCompaction) {// ... 处理强制全量合并 ...optionalUnit =CompactStrategy.pickFullCompaction(levels.numberOfLevels(), runs, recordLevelExpire);} else {if (taskFuture != null) {return;}// ...optionalUnit =strategy.pick(levels.numberOfLevels(), runs).filter(unit -> unit.files().size() > 0).filter(unit ->unit.files().size() > 1|| unit.files().get(0).level()!= unit.outputLevel());}optionalUnit.ifPresent(unit -> {// ...boolean dropDelete =unit.outputLevel() != 0&& (unit.outputLevel() >= levels.nonEmptyHighestLevel()|| dvMaintainer != null);// ...submitCompaction(unit, dropDelete);});}
// ... existing code ...
fullCompaction
参数: 如果为true
,会触发一次全量合并,通常是将所有文件合并成一个或少数几个大文件。这是一种比较重的操作,一般由用户手动触发。- 普通合并: 如果
fullCompaction
为false
,则会调用strategy.pick(...)
方法,让合并策略根据当前的Levels
状态来决定是否需要合并以及合并哪些文件。 - 提交任务: 如果策略选出了需要合并的文件(
CompactUnit
),就会调用submitCompaction
方法将任务提交到线程池。
submitCompaction(CompactUnit unit, boolean dropDelete)
: 提交合并任务
这个方法负责创建并提交一个真正的合并任务。
// ... existing code ...private void submitCompaction(CompactUnit unit, boolean dropDelete) {// ...MergeTreeCompactTask task =new MergeTreeCompactTask(keyComparator,compactionFileSize,rewriter,unit,dropDelete,levels.maxLevel(),metricsReporter,compactDfSupplier,recordLevelExpire);// ...taskFuture = executor.submit(task);// ...}
// ... existing code ...
它将所有需要的组件(如 rewriter
、keyComparator
等)和待合并的文件(unit
)打包成一个 MergeTreeCompactTask
对象,然后通过 executor.submit(task)
提交给线程池异步执行。
getCompactionResult(boolean blocking)
: 获取合并结果
当外部(通常是 MergeTreeWriter
)需要获取合并结果时,会调用此方法。
// ... existing code .../** Finish current task, and update result files to {@link Levels}. */@Overridepublic Optional<CompactResult> getCompactionResult(boolean blocking)throws ExecutionException, InterruptedException {Optional<CompactResult> result = innerGetCompactionResult(blocking);result.ifPresent(r -> {// ...levels.update(r.before(), r.after());MetricUtils.safeCall(this::reportMetrics, LOG);// ...});return result;}
// ... existing code ...
- 它会从
taskFuture
中获取任务结果CompactResult
。CompactResult
中包含了合并前的文件列表(before
)和合并后生成的新文件列表(after
)。 - 最关键的一步是
levels.update(r.before(), r.after())
,它用新生成的文件替换掉了旧的、已被合并的文件,从而更新了整个Levels
的状态。
shouldWaitForLatestCompaction()
和 compactNotCompleted()
: 控制写入流速
这两个方法是 Paimon 实现反压(Back-Pressure)机制的关键。
// ... existing code ...@Overridepublic boolean shouldWaitForLatestCompaction() {return levels.numberOfSortedRuns() > numSortedRunStopTrigger;}// ... existing code ...@Overridepublic boolean compactNotCompleted() {// If it is a lookup compaction, we should ensure that all level 0 files are consumed, so// here we need to make the outside think that we still need to do unfinished compact// workingreturn super.compactNotCompleted() || (needLookup && !levels().level0().isEmpty());}
// ... existing code ...
shouldWaitForLatestCompaction()
: 当Levels
中SortedRun
的数量超过了numSortedRunStopTrigger
阈值时,返回true
。MergeTreeWriter
会根据这个返回值决定是否要阻塞写入,等待合并完成,以释放一些SortedRun
。compactNotCompleted()
: 判断是否还有未完成的合并工作。特别地,如果配置了needLookup
,即使没有正在运行的合并任务,只要 Level 0 还存在文件,它也会返回true
,以确保 Level 0 的文件能被尽快合并掉,这对于lookup
生成 changelog 的正确性至关重要。
总结
MergeTreeCompactManager
是一个典型的生产者-消费者模型实现。MergeTreeWriter
是生产者,不断产生新的 L0 层文件;MergeTreeCompactManager
内部的 MergeTreeCompactTask
是消费者,不断地将多层级的文件合并成更少、更大的文件。
它通过 Levels
来感知系统状态,通过 CompactStrategy
来制定决策,通过 CompactRewriter
和 MergeTreeCompactTask
来执行任务,并通过 numSortedRunStopTrigger
等机制来协调与上游生产者的速度,最终保证了整个 merge-tree
写入模型的稳定和高效。
Paimon 进行 Compact 是不是就是保留最新的 Key?
Paimon 的 Compact 行为取决于在表上配置的 merge-engine
(合并引擎)。保留最新的 Key 只是其中一种最常见的默认行为。
Paimon 支持多种合并引擎,来满足不同的业务场景:
deduplicate
(默认值): 这是你提到的“保留最新的 Key”的行为。当多条记录有相同的主键时,在合并(Compaction)过程中,Paimon 只会保留序列号(sequence number)最大的一条记录,即最新的记录。partial-update
: 部分更新引擎。它允许你只更新一行的某些字段。合并时,它会将同一主键的多条记录的字段进行合并,而不是简单地保留最新的一整行。对于未更新的字段,会保留其旧值。aggregation
: 聚合引擎。它可以在数据写入过程中,根据预设的聚合函数(如SUM
,MAX
,MIN
,COUNT
等)对同一主键的记录进行实时聚合。first-row
: 保留第一行的引擎。当有相同主键的多条数据时,它会保留遇到的第一条数据,后续的数据会被忽略。这在需要按主键去重的流式数据入湖场景中很有用。
这些不同的合并逻辑由不同的 MergeFunction
实现类来处理,例如 DeduplicateMergeFunction
、PartialUpdateMergeFunction
和 AggregateMergeFunction
。
// ... existing code ...
public class AggregateMergeFunction implements MergeFunction<KeyValue> {private final InternalRow.FieldGetter[] getters;private final FieldAggregator[] aggregators;
// ... existing code ...@Overridepublic void add(KeyValue kv) {
// ... existing code ...for (int i = 0; i < getters.length; i++) {aggregators[i].agg(row, getters[i].getFieldOrNull(value));}}@Overridepublic KeyValue getResult() {
// ... existing code ...}
// ... existing code ...
}
从上面的 AggregateMergeFunction
可以看到,它的 add
方法是在对字段进行聚合(agg
),而不是简单地替换。
MergeFunction
定义了具体的数据 合并逻辑(比如是去重、聚合还是部分更新),而 MergeFunctionWrapper
则是在 MergeFunction
的基础上增加了一层 “包装”,用于处理更复杂的场景,主要是为了 优化性能 和 生成 Changelog。
不同的 MergeFunctionWrapper
实现是为了支持 Paimon 不同的 Changelog 生成策略 (changelog-producer
) 和合并场景:
ReducerMergeFunctionWrapper
是一个基础的、带有优化的包装器。从它的文档和实现可以看出,当一个 Key 只有一条记录需要合并时,它会直接返回这条记录,避免了调用内部MergeFunction
的开销。它主要用于不需要生成 Changelog 或采用简单合并策略的场景,只返回合并后的最终结果KeyValue
。/** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements. See the NOTICE file* distributed with this work for additional information* regarding copyright ownership. The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.apache.paimon.mergetree.compact;import org.apache.paimon.KeyValue;/*** Wrapper for {@link MergeFunction}s which works like a reducer.** <p>A reducer is a type of function. If there is only one input the result is equal to that input;* Otherwise the result is calculated by merging all the inputs in some way.** <p>This wrapper optimize the wrapped {@link MergeFunction}. If there is only one input, the input* will be stored and the inner merge function will not be called, thus saving some computing time.*/ public class ReducerMergeFunctionWrapper implements MergeFunctionWrapper<KeyValue> {private final MergeFunction<KeyValue> mergeFunction;private KeyValue initialKv;private boolean isInitialized;public ReducerMergeFunctionWrapper(MergeFunction<KeyValue> mergeFunction) {this.mergeFunction = mergeFunction;}/** Resets the {@link MergeFunction} helper to its default state. */@Overridepublic void reset() {initialKv = null;mergeFunction.reset();isInitialized = false;}/** Adds the given {@link KeyValue} to the {@link MergeFunction} helper. */@Overridepublic void add(KeyValue kv) {if (initialKv == null) {initialKv = kv;} else {if (!isInitialized) {merge(initialKv);isInitialized = true;}merge(kv);}}private void merge(KeyValue kv) {mergeFunction.add(kv);}/** Get current value of the {@link MergeFunction} helper. */@Overridepublic KeyValue getResult() {return isInitialized ? mergeFunction.getResult() : initialKv;} }
FullChangelogMergeFunctionWrapper
: 用于changelog-producer
=full-compaction
模式。在这种模式下,Paimon 会在最高层(maxLevel)的文件中查找旧值,并与当前合并的结果进行比较,从而生成INSERT
,UPDATE_BEFORE
,UPDATE_AFTER
,DELETE
这样的完整 Changelog。这个 Wrapper 的职责就是协调这个过程。LookupChangelogMergeFunctionWrapper
: 用于changelog-producer
=lookup
模式。它会在合并前通过lookup
的方式去查找旧的记录,从而生成 Changelog。相比full-compaction
,它可能更高效,因为它不需要总是访问最高层的文件。FirstRowMergeFunctionWrapper
: 这是一个专为first-row
合并引擎设计的包装器,用于处理其独特的 Changelog 生成逻辑。
总结一下:
MergeFunction
决定了数据 如何合并(What)。MergeFunctionWrapper
决定了合并过程 如何被执行和协调(How),特别是如何与不同的 Changelog 生成策略相结合,并进行性能优化。