配置(Maintenance)
系统表
表特定系统表
表特定系统表包含关于每个表的元数据和信息,例如创建的快照以及正在使用的选项。用户可以通过批量查询来访问系统表。
目前,Flink、Spark、Trino 和 StarRocks 支持查询系统表。
在某些情况下,表名需要用反引号括起来,以避免语法解析冲突,例如三元访问模式:
SELECT * FROM my_catalog.my_db.`my_table$snapshots`;
快照表
你可以通过快照表查询表的快照历史信息,包括快照中出现的记录数。
SELECT * FROM my_table$snapshots;
/*
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+------------------------------- +--------------------------------+---------------------+---------------------+-------------------------+----------------+
| snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | base_manifest_list | delta_manifest_list | changelog_manifest_list | total_record_count | delta_record_count | changelog_record_count | watermark |
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+------------------------------- +--------------------------------+---------------------+---------------------+-------------------------+----------------+
| 2 | 0 | 7ca4cd28-98e... | 2 | APPEND | 2022-10-26 11:44:15.600 | manifest-list-31323d5f-76e6... | manifest-list-31323d5f-76e6... | manifest-list-31323d5f-76e6... | 2 | 2 | 0 | 1666755855600 |
| 1 | 0 | 870062aa-3e9... | 1 | APPEND | 2022-10-26 11:44:15.148 | manifest-list-31593d5f-76e6... | manifest-list-31593d5f-76e6... | manifest-list-31593d5f-76e6... | 1 | 1 | 0 | 1666755855148 |
+--------------+------------+-----------------+-------------------+--------------+-------------------------+--------------------------------+------------------------------- +--------------------------------+---------------------+---------------------+-------------------------+----------------+
2 rows in set
*/
通过查询快照表,你可以了解该表的提交和过期信息,并对数据进行时间回溯。
模式表
你可以通过模式表查询表的历史模式。
SELECT * FROM my_table$schemas;
/*
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
| schema_id | fields | partition_keys | primary_keys | options | comment | update_time |
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
| 0 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | | 2022-10-28 11:44:20.600 |
| 1 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | | 2022-10-27 11:44:15.600 |
| 2 | [{"id":0,"name":"word","typ... | [] | ["word"] | {} | | 2022-10-26 11:44:10.600 |
+-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
3 rows in set
*/
你可以将快照表和模式表连接起来,以获取给定快照的字段。
SELECT s.snapshot_id, t.schema_id, t.fields FROM my_table$snapshots s JOIN my_table$schemas t ON s.schema_id = t.schema_id where s.snapshot_id = 100;
选项表
你可以通过选项表查询从DDL指定的表的选项信息。未显示的选项将采用默认值。你可以参考配置。
SELECT * FROM my_table$options;/*
+------------------------+--------------------+
| key | value |
+------------------------+--------------------+
| snapshot.time-retained | 5 h |
+------------------------+--------------------+
1 rows in set
*/
审计日志表
如果你需要审计表的变更日志,可以使用 audit_log
系统表。通过 audit_log
表,在获取表的增量数据时,你可以得到 rowkind
列。你可以使用该列进行过滤等操作以完成审计。
rowkind
有四个值: -+I
:插入操作。 --U
:更新操作,包含更新前行的先前内容。 -+U
:更新操作,包含更新行的新内容。 --D
:删除操作。
SELECT * FROM my_table$audit_log;
/*
+------------------+-----------------+-----------------+
| rowkind | column_0 | column_1 |
+------------------+-----------------+-----------------+
| +I | ... | ... |
+------------------+-----------------+-----------------+
| -U | ... | ... |
+------------------+-----------------+-----------------+
| +U | ... | ... |
+------------------+-----------------+-----------------+
3 rows in set
*/
读优化表
如果你需要极高的读取性能,并且能够接受读取略微陈旧的数据,可以使用 ro
(读优化)系统表。读优化系统表通过仅扫描无需合并的文件来提高读取性能。
对于主键表,ro
系统表仅扫描最顶层的文件。也就是说,ro
系统表仅生成最新完全合并的结果。
不同的桶可能在不同时间进行完全合并,因此不同键的值可能来自不同的快照。
对于追加表,由于所有文件无需合并即可读取,ro
系统表的行为类似于普通追加表。
SELECT * FROM my_table$ro;
文件表
你可以查询具有特定快照的表的文件。
-- 查询最新快照的文件
SELECT * FROM my_table$files;
/*
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
| partition | bucket | file_path | file_format | schema_id | level | record_count | file_size_in_bytes | min_key | max_key | null_value_counts | min_value_stats | max_value_stats | min_sequence_number | max_sequence_number | creation_time |
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
| [3] | 0 | data-8f64af95-29cc-4342-adc... | orc | 0 | 0 | 1 | 593 | [c] | [c] | {cnt = 0, val = 0, word = 0} | {cnt = 3, val = 33, word = c} | {cnt = 3, val = 33, word = c} | 1691551246234 | 1691551246637 |2023-02-24T16:06:21.166 |
| [2] | 0 | data-8b369068-0d37-4011-aa5... | orc | 0 | 0 | 1 | 593 | [b] | [b] | {cnt = 0, val = 0, word = 0} | {cnt = 2, val = 22, word = b} | {cnt = 2, val = 22, word = b} | 1691551246233 | 1691551246732 |2023-02-24T16:06:21.166 |
| [2] | 0 | data-83aa7973-060b-40b6-8c8... | orc | 0 | 0 | 1 | 605 | [d] | [d] | {cnt = 0, val = 0, word = 0} | {cnt = 2, val = 32, word = d} | {cnt = 2, val = 32, word = d} | 1691551246267 | 1691551246798 |2023-02-24T16:06:21.166 |
| [5] | 0 | data-3d304f4a-bcea-44dc-a13... | orc | 0 | 0 | 1 | 593 | [c] | [c] | {cnt = 0, val = 0, word = 0} | {cnt = 5, val = 51, word = c} | {cnt = 5, val = 51, word = c} | 1691551246788 | 1691551246152 |2023-02-24T16:06:21.166 |
| [1] | 0 | data-10abb5bc-0170-43ae-b6a... | orc | 0 | 0 | 1 | 595 | [a] | [a] | {cnt = 0, val = 0, word = 0} | {cnt = 1, val = 11, word = a} | {cnt = 1, val = 11, word = a} | 1691551246722 | 1691551246273 |2023-02-24T16:06:21.166 |
| [4] | 0 | data-2c9b7095-65b7-4013-a7a... | orc | 0 | 0 | 1 | 593 | [a] | [a] | {cnt = 0, val = 0, word = 0} | {cnt = 4, val = 12, word = a} | {cnt = 4, val = 12, word = a} | 1691551246321 | 1691551246109 |2023-02-24T16:06:21.166 |
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
6 rows in set
*/-- 你也可以查询特定快照的文件
SELECT * FROM my_table$files /*+ OPTIONS('scan.snapshot-id'='1') */;
/*
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
| partition | bucket | file_path | file_format | schema_id | level | record_count | file_size_in_bytes | min_key | max_key | null_value_counts | min_value_stats | max_value_stats | min_sequence_number | max_sequence_number | creation_time |
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
| [3] | 0 | data-8f64af95-29cc-4342-adc... | orc | 0 | 0 | 1 | 593 | [c] | [c] | {cnt = 0, val = 0, word = 0} | {cnt = 3, val = 33, word = c} | {cnt = 3, val = 33, word = c} | 1691551246234 | 1691551246637 |2023-02-24T16:06:21.166 |
| [2] | 0 | data-8b369068-0d37-4011-aa5... | orc | 0 | 0 | 1 | 593 | [b] | [b] | {cnt = 0, val = 0, word = 0} | {cnt = 2, val = 22, word = b} | {cnt = 2, val = 22, word = b} | 1691551246233 | 1691551246732 |2023-02-24T16:06:21.166 |
| [1] | 0 | data-10abb5bc-0170-43ae-b6a... | orc | 0 | 0 | 1 | 595 | [a] | [a] | {cnt = 0, val = 0, word = 0} | {cnt = 1, val = 11, word = a} | {cnt = 1, val = 11, word = a} | 1691551246267 | 1691551246798 |2023-02-24T16:06:21.166 |
+-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+------------------------+-------------------------+-------------------------+---------------------+---------------------+-----------------------+
3 rows in set
*/
标签表
你可以通过标签表查询表的标签历史信息,包括标签基于哪些快照以及这些快照的一些历史信息。你还可以获取所有标签名称,并通过名称进行时间回溯到特定标签的数据。
SELECT * FROM my_table$tags;
/*
+----------+-------------+-----------+-------------------------+--------------+--------------+
| tag_name | snapshot_id | schema_id | commit_time | record_count | branches |
+----------+-------------+-----------+-------------------------+--------------+--------------+
| tag1 | 1 | 0 | 2023-06-28 14:55:29.344 | 3 | [] |
| tag3 | 3 | 0 | 2023-06-28 14:58:24.691 | 7 | [branch-1] |
+----------+-------------+-----------+-------------------------+--------------+--------------+
2 rows in set
*/
分支表
你可以查询表的分支。
SELECT * FROM my_table$branches;
/*
+----------------------+---------------------------+--------------------------+-------------------------+
| branch_name | created_from_tag | created_from_snapshot | create_time |
+----------------------+---------------------------+--------------------------+-------------------------+
| branch1 | tag1 | 2 | 2024-07-18 20:31:39.084 |
| branch2 | tag2 | 5 | 2024-07-18 21:11:14.373 |
+----------------------+---------------------------+--------------------------+-------------------------+
2 rows in set
*/
消费者表
你可以查询所有包含下一个快照的消费者。
SELECT
* FROM my_table$consumers;
/*
+-------------+------------------+
| consumer_id | next_snapshot_id |
+-------------+------------------+
| id1 | 1 |
| id2 | 3 |
+-------------+------------------+
2 rows in set
*/
清单表
你可以查询当前表最新快照或指定快照中包含的所有清单文件。
-- 查询最新快照的清单
SELECT * FROM my_table$manifests;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
| manifest-f4dcab43-ef6b-4713... | 1648 | 1 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
2 rows in set
*/-- 你也可以查询指定快照的清单
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.snapshot-id'='1') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
1 rows in set
*/
聚合字段表
你可以通过聚合字段表查询表的历史聚合信息。
SELECT * FROM my_table$aggregation_fields;
/*
+------------+-----------------+--------------+--------------------------------+---------+
| field_name | field_type | function | function_options | comment |
+------------+-----------------+--------------+--------------------------------+---------+
| product_id | BIGINT NOT NULL | [] | [] | |
| price | INT | [true,count] | [fields.price.ignore-retrac... | |
| sales | BIGINT | [sum] | [fields.sales.aggregate-fun... | |
+------------+-----------------+--------------+--------------------------------+---------+
3 rows in set
*/
分区表
你可以查询表的分区文件。
SELECT * FROM my_table$partitions;
/*
+---------------+----------------+--------------------+--------------------+------------------------+
| partition | record_count | file_size_in_bytes| file_count| last_update_time|
+---------------+----------------+--------------------+--------------------+------------------------+
| [1] | 1 | 645 | 1 | 2024-06-24 10:25:57.400|
+---------------+----------------+--------------------+--------------------+------------------------+
*/
全局系统表
全局系统表包含Paimon中所有表的统计信息。为了方便查询,我们创建了一个名为 sys
的参考系统数据库。我们可以在Flink中通过SQL显示所有全局系统表:
USE sys;
SHOW TABLES;
所有选项表
此表与选项表类似,但它显示所有数据库中所有表的选项。
SELECT * FROM sys.all_table_options;
/*
+---------------+--------------------------------+--------------------------------+------------------+
| database_name | table_name | key | value |
+---------------+--------------------------------+--------------------------------+------------------+
| my_db | Orders_orc | bucket | -1 |
| my_db | Orders2 | bucket | -1 |
| my_db | Orders2 | sink.parallelism | 7 |
| my_db2| OrdersSum | bucket | 1 |
+---------------+--------------------------------+--------------------------------+------------------+
7 rows in set
*/
目录选项表
你可以通过目录选项表查询目录的选项信息。未显示的选项将采用默认值。你可以参考配置。
SELECT * FROM sys.catalog_options;
/*
+-----------+---------------------------+
| key | value |
+-----------+---------------------------+
| warehouse | hdfs:///path/to/warehouse |
+-----------+---------------------------+
1 rows in set
*/
拓展:
系统表在数据管理与分析中的应用:这些系统表为数据管理人员和分析师提供了丰富的信息。例如,通过快照表和模式表,数据分析师可以了解数据的历史变更,从而更好地理解数据的演变过程,有助于调试数据处理流程或进行数据质量追溯。选项表则让管理员能够清晰知晓表的配置选项,方便进行性能调优或故障排查。审计日志表对于合规性要求较高的场景非常关键,它详细记录了数据的变更操作,满足审计和监管需求。
读优化表与大数据查询性能优化:读优化表(
ro
系统表)针对大数据场景下的读取性能进行了优化。在数据量庞大且对数据实时性要求不是极高的情况下,通过仅扫描无需合并的文件,大大减少了查询时的数据扫描范围,从而显著提升读取速度。对于主键表,这种优化方式确保了查询能够快速获取最新完全合并后的结果,避免了扫描大量冗余或过时的数据文件。而对于追加表,ro
系统表的行为与普通追加表一致,保持了数据读取的一致性和稳定性。全局系统表与多表管理:全局系统表为管理多个表提供了统一的视角。
sys
数据库中的这些表,如所有选项表和目录选项表,使得管理员可以在一个地方查看和管理所有表及目录的相关信息。这在大规模数据仓库环境中尤为重要,方便进行整体的配置管理、性能监控以及资源优化。例如,通过查看所有选项表,管理员可以快速发现不同表在配置上的差异,及时调整以确保整个数据存储和处理系统的一致性和高效性。
写入性能
Paimon的写入性能与检查点密切相关,因此如果你需要更高的写入吞吐量,可以采取以下措施:
-
Flink配置(在'flink-conf.yaml'中或通过SQL的SET语句):增加检查点间隔('execution.checkpointing.interval'),将最大并发检查点增加到3('execution.checkpointing.max-concurrent-checkpoints'),或者直接使用批处理模式。
-
增加 write-buffer-size 写入缓冲区大小。
-
启用write-buffer-spillable可溢出的写入缓冲区。
-
如果你正在使用固定桶模式,可以重新调整桶的数量。
选项'changelog-producer' = 'lookup' 或 'full-compaction',以及选项'full-compaction.delta-commits' 对写入性能有很大影响。如果处于快照/全量同步阶段,你可以取消设置这些选项,然后在增量阶段再次启用它们。
如果你发现作业的输入在出现背压的情况下呈现锯齿状模式,这可能是工作节点负载不均衡。你可以考虑开启异步合并,观察吞吐量是否提高。
并行度
建议sink的并行度小于或等于桶的数量,最好相等。你可以使用sink.parallelism表属性来控制sink的并行度。
选项 | 是否必需 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
sink.parallelism | 否 | (无) | 整数 | 定义sink算子的并行度。默认情况下,并行度由框架根据上游链式算子的并行度来确定。 |
本地合并
如果你的作业受到主键数据倾斜的影响(例如,你想统计网站中每个页面的浏览量,而某些特定页面非常受用户欢迎),你可以设置'local-merge-buffer-size',以便在按桶进行洗牌并写入sink之前,对输入记录进行缓冲和合并。当相同主键在快照之间频繁更新时,这特别有用。
缓冲区满时将被刷新。当你面临数据倾斜但不知道从何处开始调整缓冲区大小时,我们建议从64MB开始尝试。
(目前,本地合并不适用于CDC摄入)
文件格式
如果你想实现极致的合并性能,可以考虑使用行存储文件格式AVRO。
-
优点:可以实现高写入吞吐量和合并性能。
-
缺点:分析查询会很慢,行存储最大的问题是它没有查询投影。例如,如果表有100列,但只查询几列,行存储的IO开销就不容忽视。此外,合并效率会降低,存储成本会增加。这是一种权衡。
通过以下选项启用行存储: -file.format = avro -metadata.stats-mode = none
收集行存储的统计信息成本有点高,所以我也建议关闭统计信息。
如果你不想将所有文件都修改为Avro格式,至少可以考虑将前几层的文件修改为Avro格式。你可以使用'file.format.per.level' = '0:avro,1:avro'来指定前两层的文件为Avro格式。
文件压缩
默认情况下,Paimon使用zstd合并算法且级别为1,你可以修改合并算法相关设置:
-
'file.compression.zstd-level':默认的zstd级别是1。为了获得更高的合并率,可以将其配置为9,但读写速度会显著下降。
稳定性
如果桶的数量或资源过少,完全合并可能会导致检查点超时,Flink的默认检查点超时时间是10分钟。
如果你期望即使在这种情况下也能保持稳定性,可以提高检查点超时时间,例如: -execution.checkpointing.timeout = 60 min
写入初始化
在写入初始化阶段,桶的写入器需要读取所有历史文件。如果这里出现瓶颈(例如,同时写入大量分区),你可以使用write-manifest-cache来缓存读取的清单数据,以加速初始化过程。
写入内存
Paimon写入器主要在以下三个地方占用内存:
-
写入器的内存缓冲区:由单个任务的所有写入器共享并可被抢占。此内存值可通过write-buffer-size表属性进行调整。
-
合并多个排序运行进行合并时消耗的内存:可以通过num-sorted-run.compaction-trigger选项来调整要合并的排序运行数量,从而调整该部分内存占用。
-
如果行数据非常大:在进行合并时,一次读取过多行数据会消耗大量内存。减少read.batch-size选项的值可以缓解这种情况的影响。
-
写入列式ORC文件时消耗的内存:减少orc.write.batch-size选项的值可以降低ORC格式的内存消耗。
-
如果在写入任务中文件自动合并:某些大列的字典在合并过程中会显著消耗内存。
-
要在Parquet格式中禁用所有字段的字典编码,设置'parquet.enable.dictionary' = 'false'。
-
要在ORC格式中禁用所有字段的字典编码,设置orc.dictionary.key.threshold = '0'。此外,设置orc.column.encoding.direct = 'field1,field2' 可以禁用特定列的字典编码。
-
如果你的Flink作业不依赖状态,请避免使用托管内存,你可以通过以下Flink参数进行控制:
-
taskmanager.memory.managed.size = 1m
或者你可以将Flink托管内存用于写入缓冲区以避免OOM,设置表属性:
-
sink.use-managed-memory-allocator = true
提交内存
如果写入表的数据量特别大,提交节点可能会使用大量内存,如果内存过小则可能发生OOM(内存溢出)。在这种情况下,你需要增加提交器的堆内存,但你可能不想统一增加Flink的TaskManager的内存,因为这可能会导致内存浪费。
你可以使用Flink的细粒度资源管理来仅增加提交器的堆内存:
-
配置Flink参数cluster.fine-grained-resource-management.enabled: true。(Flink 1.18之后默认为true)
-
配置Paimon表选项:sink.committer-memory,例如300MB,具体取决于你的TaskManager。(也支持sink.committer-cpu选项)
拓展:
检查点与写入性能关系的深入理解:检查点在Flink中是为了保证作业的容错性,但它会对写入性能产生影响。增加检查点间隔意味着两次检查点之间有更多时间进行写入操作,从而提高写入吞吐量。而最大并发检查点数量的调整,可以避免检查点操作过于频繁而阻塞写入。例如,在一个实时数据写入任务中,默认的检查点设置可能导致频繁的写入中断以进行检查点操作,通过适当增加这两个参数,可以减少中断次数,提升整体写入效率。
数据倾斜与本地合并优化原理:数据倾斜是大数据处理中常见的问题,在Paimon中通过本地合并机制来缓解。当主键数据倾斜时,大量相同主键的数据会集中在某些桶中,导致写入性能下降。本地合并缓冲区(local-merge-buffer)会先将输入记录缓冲起来并进行合并,减少了相同主键数据在桶中的重复写入,从而提高写入性能。例如,在电商销售数据统计中,某些热门商品的销售记录可能会导致数据倾斜,本地合并缓冲区可以先对这些热门商品的销售记录进行合并,再写入相应的桶,避免了重复数据的频繁写入。
文件格式选择对性能的综合影响:选择不同的文件格式(如AVRO)涉及到写入性能、查询性能和存储成本之间的权衡。AVRO的行存储方式虽然在写入和合并时表现出色,但由于缺乏查询投影,在查询时会读取更多不必要的数据,增加IO开销,导致查询性能下降。在实际应用中,需要根据业务场景来决定。如果是写入密集型业务,对查询实时性要求不高,AVRO可能是一个不错的选择;但如果是查询频繁且对响应时间要求严格的业务,则需要谨慎考虑。例如,在日志数据的收集和存储场景中,由于日志数据主要用于后续分析,对写入性能要求较高,AVRO格式可以快速将日志数据写入存储,而在查询时虽然速度较慢,但可以通过批量分析的方式来弥补。
内存管理对写入稳定性的关键作用:Paimon写入过程中的内存管理非常关键。从写入缓冲区到合并过程中的内存使用,再到文件格式相关的内存消耗,每个环节都可能影响写入的稳定性。合理调整这些内存参数,可以避免内存溢出问题,保证作业的稳定运行。例如,对于写入缓冲区,根据数据量大小调整write-buffer-size可以确保缓冲区既能有效缓存数据,又不会占用过多内存。而在处理大列数据时,禁用字典编码等操作,可以减少合并过程中的内存消耗,避免OOM情况的发生,确保数据写入的连续性和稳定性。
专用合并(Dedicated Compaction)
Paimon的快照管理支持多个写入器同时写入。
对于类似S3的对象存储,其“重命名”操作不具备原子语义。我们需要配置Hive元存储,并为目录启用“lock.enabled”选项。
默认情况下,Paimon支持并发写入不同分区。一种推荐的模式是,流处理作业将记录写入Paimon的最新分区;同时,批处理作业(覆盖写入)将记录写入历史分区。
到目前为止,这些操作都运行良好,但是如果你需要多个写入器将记录写入同一分区,情况就会稍微复杂一些。例如,你不想使用UNION ALL,而是有多个流处理作业将记录写入一个“部分更新”表。请参考下面的“专用合并作业”。
专用合并作业
默认情况下,Paimon写入器在写入记录时会根据需要执行合并操作。这在大多数用例中已经足够。
合并会将一些数据文件标记为“已删除”(并非真正删除,更多信息请参阅过期快照相关内容)。如果多个写入器标记同一个文件,在提交更改时就会发生冲突。Paimon会自动解决冲突,但这可能会导致作业重启。
为避免这些问题,用户也可以选择在写入器中跳过合并操作,而是运行一个专门用于合并的作业。由于合并仅由专用作业执行,写入器可以持续写入记录而无需暂停,并且不会发生冲突。
要在写入器中跳过合并操作,将以下表属性设置为true。
选项 | 是否必需 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
write - only | 否 | false | 布尔值 | 如果设置为true,将跳过合并和快照过期操作。此选项与专用合并作业配合使用。 |
要运行专用合并作业,请遵循以下说明。
Flink SQL
运行以下SQL语句:
-- 压缩表
CALL sys.compact(`table` =]]> 'default.T');-- 带选项压缩表
CALL sys.compact(`table` =]]> 'default.T', `options` =]]>'sink.parallelism=4');-- 压缩表分区
CALL sys.compact(`table` =]]> 'default.T', `partitions` =]]> 'p=0');-- 带筛选条件压缩表分区
CALL sys.compact(`table` =]]> 'default.T', `where` =]]> 'dt>10 and h<20');
同样,默认情况下是同步合并,这可能会导致检查点超时。你可以配置table_conf
以使用异步合并。
数据库合并作业
你可以运行以下命令为多个数据库提交合并作业。
Flink SQL
运行以下SQL语句:
CALL sys.compact_database('includingDatabases')CALL sys.compact_database('includingDatabases','mode')CALL sys.compact_database('includingDatabases','mode', 'includingTables')CALL sys.compact_database('includingDatabases','mode', 'includingTables', 'excludingTables')CALL sys.compact_database('includingDatabases','mode', 'includingTables', 'excludingTables', 'tableOptions')-- 示例
CALL sys.compact_database('db1|db2', 'combined', 'table_.*', 'ignore','sink.parallelism=4')
排序合并
如果你的表配置为动态桶主键表或追加表,你可以触发按指定列排序的合并操作以加速查询。
Flink SQL
运行以下SQL语句:
-- 排序压缩表
CALL sys.compact(`table` =]]> 'default.T', order_strategy =]]> 'zorder', order_by =]]> 'a,b')
历史分区合并
你可以运行以下命令为一段时间内未接收任何新数据的分区提交合并作业。这些分区中的小文件将进行完全合并。
此功能目前仅在批处理模式下可用。
针对单个表
这是针对单个表的操作。
Flink SQL
运行以下SQL语句:
-- 历史分区压缩表
CALL sys.compact(`table` =]]> 'default.T', 'partition_idle_time' =]]> '1 d')
针对多个数据库
这是针对不同数据库中的多个表的操作。
Flink SQL
运行以下SQL语句:
-- 历史分区压缩表
CALL sys.compact_database('includingDatabases','mode', 'includingTables', 'excludingTables', 'tableOptions', 'partition_idle_time')-- 示例:压缩数据库中表的历史分区
CALL sys.compact_database('test_db', 'combined', '', '', '', '1 d')
拓展:
专用合并作业在复杂写入场景中的意义:在大数据处理场景中,多个写入器同时写入同一分区可能会引发数据一致性和性能问题。Paimon通过引入专用合并作业,将合并操作从常规写入流程中分离出来,有效避免了因多个写入器同时标记文件删除而产生的冲突,保证了写入过程的连续性,减少作业重启次数。这对于需要高可靠性和持续写入性能的应用场景,如实时数据采集和处理系统,尤为重要。
不同合并方式的适用场景:
数据库合并作业:适用于需要对多个数据库中的表进行批量合并的场景。例如,在数据仓库环境中,可能有多个数据库存储不同主题的数据,通过数据库合并作业可以一次性对这些数据库中的相关表进行合并,提高整体存储效率和查询性能。
排序合并:对于动态桶主键表或追加表,排序合并能够根据指定列进行排序后合并,从而在查询时利用排序优势加速数据检索。比如在时间序列数据存储中,按照时间列进行排序合并后,查询特定时间范围的数据时可以更快定位到相关文件,提升查询速度。
历史分区合并:主要用于处理长时间未更新的分区,通过对这些分区中的小文件进行完全合并,可以释放存储资源,提高存储利用率,同时也能提升对这些历史数据的查询性能。例如,在日志数据存储中,随着时间推移,旧的日志分区数据不再更新,对这些分区进行历史分区合并,可以优化存储结构,方便后续的数据分析和审计。
并发写入与合并的协调机制:Paimon在支持并发写入的同时,通过合理的合并机制保证数据的一致性和性能。在多个写入器同时工作时,默认的合并方式可能会引发冲突,但通过设置
write - only
属性并结合专用合并作业,可以实现写入与合并的解耦,确保并发写入的高效和稳定。此外,在不同的合并场景中,如同步与异步合并的选择,也需要根据实际的业务需求和系统资源情况进行权衡。例如,在对实时性要求较高的场景中,异步合并可以避免合并操作对写入过程的阻塞,保证数据的快速写入;而在对数据一致性要求极高且系统资源充足的情况下,同步合并可能更合适,因为它能即时完成合并操作,确保数据状态的及时更新。
管理快照(Manage Snapshots)
本节将介绍与快照相关的管理操作和行为。
过期快照
Paimon写入器每次提交会生成一个或两个快照。每个快照可能会添加一些新的数据文件,或者将一些旧数据文件标记为已删除。然而,被标记的数据文件并不会真正被删除,因为Paimon还支持回溯到更早的快照。只有当快照过期时,这些文件才会被删除。
目前,在提交新更改时,Paimon写入器会自动执行过期操作。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,从而释放磁盘空间。
快照过期由以下表属性控制:
选项 | 是否必需 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
snapshot.time-retained | 否 | 1小时 | 时长 | 已完成快照保留的最长时间。 |
snapshot.num-retained.min | 否 | 10 | 整数 | 已完成快照保留的最小数量。应大于或等于1。 |
snapshot.num-retained.max | 否 | Integer.MAX_VALUE(Java中整型最大值) | 整数 | 已完成快照保留的最大数量。应大于或等于最小数量。 |
snapshot.expire.execution-mode | 否 | sync(同步) | 枚举值 | 指定过期操作的执行模式。 |
snapshot.expire.limit | 否 | 10 | 整数 | 一次允许过期的最大快照数量。 |
当快照数量小于snapshot.num-retained.min
时,不会有快照过期(即使满足snapshot.time-retained
条件)。此后,snapshot.num-retained.max
和snapshot.time-retained
将用于控制快照过期,直到剩余的快照满足条件。
以下示例展示更多细节(snapshot.num-retained.min
为2,snapshot.time-retained
为1小时,snapshot.num-retained.max
为5):
使用元组(snapshotId,对应时间)描述快照项
新快照 | 过期检查后的所有快照 | 解释 |
---|---|---|
(snapshots-1, 2023-07-06 10:00) | (snapshots-1, 2023-07-06 10:00) | 无快照过期 |
(snapshots-2, 2023-07-06 10:20) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) | 无快照过期 |
(snapshots-3, 2023-07-06 10:40) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) | 无快照过期 |
(snapshots-4, 2023-07-06 11:00) | (snapshots-1, 2023-07-06 10:00) (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) | 无快照过期 |
(snapshots-5, 2023-07-06 11:20) | (snapshots-2, 2023-07-06 10:20) (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) | snapshot-1 过期,因为不满足 |
(snapshots-6, 2023-07-06 11:30) | (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) | snapshot-2 过期,因为不满足 |
(snapshots-7, 2023-07-06 11:35) | (snapshots-3, 2023-07-06 10:40) (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) (snapshots-7, 2023-07-06 11:35) | 无快照过期 |
(snapshots-8, 2023-07-06 11:36) | (snapshots-4, 2023-07-06 11:00) (snapshots-5, 2023-07-06 11:20) (snapshots-6, 2023-07-06 11:30) (snapshots-7, 2023-07-06 11:35) (snapshots-8, 2023-07-06 11:36) | snapshot-3 过期,因为不满足 |
请注意,保留时间过短或保留数量过小可能会导致: -批处理查询找不到文件。例如,表比较大,批处理查询需要10分钟读取数据,但10分钟前的快照过期了,此时批处理查询将读取到已删除的快照。 -基于表文件的流读取作业重启失败。当作业重启时,它记录的快照可能已过期。(在快照过期保留时间较短的情况下,你可以使用消费者ID来保护流读取。)
默认情况下,Paimon会同步删除过期快照。当需要删除的文件过多时,可能无法快速删除,从而对上游操作造成压力。为避免这种情况,用户可以通过将snapshot.expire.execution-mode
设置为async
(异步)来使用异步过期模式。
回滚到快照
将表回滚到特定的快照ID。
Flink
运行以下命令:
/bin/flink run \/path/to/paimon-flink-action-0.9.0.jar \rollback_to \--warehouse \--database \ --table \--version \[--catalog_conf [--catalog_conf ...]]
删除孤立文件
Paimon文件仅在快照过期时才会被物理删除。然而,在删除文件时可能会发生一些意外错误,导致存在未被Paimon快照使用的文件(即所谓的“孤立文件”)。你可以提交一个remove_orphan_files
作业来清理它们:
Spark SQL/Flink SQL
CALL sys.remove_orphan_files(table =]]> "my_db.my_table", [older_than =]]> "2023-10-31 12:00:00"])CALL sys.remove_orphan_files(table =]]> "my_db.*", [older_than =]]> "2023-10-31 12:00:00"])
拓展:
快照过期机制的优化考量:快照过期机制是平衡存储资源利用和数据可访问性的关键。保留时间和保留数量的设置需要根据具体业务场景进行精细调整。例如,在实时数据分析场景中,如果数据变化频繁且对历史数据的依赖较低,可以适当缩短保留时间和减少保留数量,以更快地释放磁盘空间。但对于需要长期保留历史数据用于审计或趋势分析的场景,就需要设置较长的保留时间和较多的保留数量。此外,异步过期模式在处理大量过期文件时,能够有效避免对上游操作的压力,提高系统整体的稳定性和性能。
回滚到快照的应用场景:回滚到特定快照在数据处理过程中非常有用。例如,当发现最近的数据处理操作导致数据错误时,可以通过回滚到之前正确的快照来恢复数据状态,避免重新处理大量数据的成本。在进行数据更新或迁移操作时,也可以先创建快照,若操作出现问题,能够迅速回滚到操作前的状态。这一功能为数据管理提供了一种可靠的“撤销”机制,增强了系统的容错能力。
删除孤立文件的重要性:孤立文件的存在不仅占用宝贵的存储资源,还可能导致数据管理的混乱。定期清理孤立文件有助于维护存储系统的整洁和高效。通过
remove_orphan_files
作业,可以根据特定条件(如文件的创建时间)来删除不再被使用的文件。在大规模数据存储环境中,这种清理操作对于优化存储结构、提高文件系统性能至关重要。 例如,在一个长期运行的数据仓库中,频繁的写入和删除操作可能会产生大量孤立文件,定期执行该作业可以有效避免存储碎片化问题,确保数据存储和访问的高效性。
重新调整桶数量(Rescale Bucket)
由于桶的总数会极大地影响性能,Paimon允许用户通过ALTER TABLE
命令调整桶的数量,并通过INSERT OVERWRITE
在不重新创建表/分区的情况下重新组织数据布局。在执行覆盖作业时,框架会自动扫描旧桶数量的数据,并根据当前桶数量对记录进行哈希处理。
重新调整覆盖(Rescale Overwrite)
-- 重新调整桶的总数
ALTER TABLE table_identifier SET ('bucket' = '...');-- 重新组织表/分区的数据布局
INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
SELECT ...
FROM table_identifier
[WHERE part_spec];
请注意:
-
ALTER TABLE
仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过INSERT OVERWRITE
来实现。 -
重新调整桶数量不会影响正在运行的读取和写入作业。
-
一旦桶数量发生变化,任何新调度的
INSERT INTO
作业,如果写入未重新组织的现有表/分区,将抛出TableException
异常,提示信息类似:“尝试以新的桶数量...写入表/分区...,但之前的桶数量是...。请切换到批处理模式,并首先执行INSERT OVERWRITE
以重新调整当前数据布局。” -
对于分区表,不同分区可以有不同的桶数量。例如:
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM ...;ALTER TABLE my_table SET ('bucket' = '8');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;
-
在覆盖期间,确保没有其他作业写入相同的表/分区。
注意:对于启用了日志系统(如Kafka)的表,请同时重新调整主题的分区以保持一致性。
用例(Use Case)
重新调整桶数量有助于应对吞吐量的突然激增。假设有一个每日流ETL任务用于同步交易数据。表的DDL和管道如下所示:
-- 表DDL
CREATE TABLE verified_orders (trade_order_id BIGINT,item_id BIGINT,item_price DOUBLE,dt STRING,PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED
) PARTITIONED BY (dt)
WITH ('bucket' = '16'
);-- 类似于从Kafka表
CREATE temporary TABLE raw_orders(trade_order_id BIGINT,item_id BIGINT,item_price BIGINT,gmt_create STRING,order_status STRING
) WITH ('connector' = 'kafka','topic' = '...','properties.bootstrap.servers' = '...','format' = 'csv'...
);-- 以桶数量为16进行流式插入
INSERT INTO verified_orders
SELECT trade_order_id,item_id,item_price,DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status ='verified';
在过去几周,该管道运行良好。然而,最近数据量快速增长,作业延迟持续增加。为提高数据的时效性,用户可以:
1. 使用保存点暂停流作业(请参阅“暂停状态和优雅停止作业并创建最终保存点”):
$ ./bin/flink stop \--savepointPath /tmp/flink-savepoints \$JOB_ID
2. 增加桶的数量:
-- 扩展
ALTER TABLE verified_orders SET ('bucket' = '32');
3. 切换到批处理模式并覆盖流作业正在写入的当前分区:
SET 'execution.runtime-mode' = 'batch';
-- 假设今天是2022-06-22
-- 情况1:没有延迟事件更新历史分区,因此覆盖今天的分区就足够了
INSERT OVERWRITE verified_orders PARTITION (dt = '2022-06-22')
SELECT trade_order_id,item_id,item_price
FROM verified_orders
WHERE dt = '2022-06-22';-- 情况2:有延迟事件更新历史分区,但范围不超过3天
INSERT OVERWRITE verified_orders
SELECT trade_order_id,item_id,item_price,dt
FROM verified_orders
WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');
4. 覆盖作业完成后,切换回流式模式。现在,可以随着桶数量的增加提高并行度,并从保存点恢复流作业(请参阅“从保存点启动SQL作业”):
SET 'execution.runtime-mode' ='streaming';
SET 'execution.savepoint.path' = ;INSERT INTO verified_orders
SELECT trade_order_id,item_id,item_price,DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status ='verified';
拓展:
桶数量对性能的影响原理:在大数据处理中,桶(bucket)的数量影响数据的分布和并行处理能力。更多的桶意味着数据可以更均匀地分布在存储和计算资源上,从而提高并行处理的效率。例如,在一个大规模的分布式数据存储系统中,每个桶可以分配到不同的存储节点和计算资源,当桶数量过少时,可能会导致某些桶的数据量过大,形成数据热点,影响整体性能。而通过合理增加桶的数量,可以将数据更均衡地分配,减少数据热点,提升系统的吞吐量和响应速度。
重新调整桶数量操作流程的关键要点:重新调整桶数量的过程中,
ALTER TABLE
命令只是更新表的元数据,告知系统新的桶数量设置。真正对数据进行重新组织以适应新桶数量的操作是通过INSERT OVERWRITE
完成的。这是因为INSERT OVERWRITE
会重新扫描数据并根据新的桶数量进行哈希分布,确保数据在新的桶布局下得到正确存储。在操作过程中,要特别注意避免在覆盖数据时其他作业对相同表或分区的写入操作,以免造成数据冲突和不一致。另外,对于依赖日志系统(如Kafka)的表,同步调整日志主题的分区与表的桶数量一致,是保证数据一致性的关键步骤。例如,如果Kafka主题分区数量与表的桶数量不匹配,可能会导致数据写入或读取的混乱。用例中的性能优化思路:在实际应用中,数据量的动态变化是常见的情况。如上述用例中,随着数据量的快速增长,原有的桶数量无法满足处理需求,导致作业延迟增加。通过暂停流作业、增加桶数量、在批处理模式下覆盖数据,再恢复流作业的流程,实现了系统性能的优化。这种方法在不影响现有数据的情况下,对数据布局进行了调整,提高了系统处理能力。在覆盖数据时,根据是否有延迟事件更新历史分区来决定覆盖的范围,体现了操作的灵活性和对实际业务场景的适应性。例如,如果有延迟事件更新历史分区,且范围较大,就需要覆盖更多历史分区的数据,以确保数据的完整性和准确性。
管理标签
Paimon的快照提供了一种便捷的历史数据查询方式。但在大多数场景下,一个作业会生成大量快照,并且表会根据配置使旧快照过期。快照过期会删除旧数据文件,过期快照的历史数据将无法再查询。
为解决此问题,你可以基于某个快照创建标签。标签会维护该快照的清单和数据文件。一种典型用法是每天创建标签,这样就能保留每天的历史数据用于批量读取。
自动创建
Paimon支持在写入作业中自动创建标签。
步骤1:选择创建模式
你可以通过表选项tag.automatic-creation
设置创建模式。支持的值有:
-
process-time
:基于机器时间创建标签。 -
watermark
:基于Sink输入的水印创建标签。 -
batch
:在批处理场景中,当前任务完成后生成一个标签。
如果你选择Watermark
,可能需要指定水印的时区。如果水印不在协调世界时(UTC)时区,请配置sink.watermark-time-zone
。
步骤2:选择创建周期
即使用何种频率生成标签。对于tag.creation-period
,你可以选择daily
(每天)、hourly
(每小时)和two-hours
(每两小时)。
如果你需要等待延迟数据,可以配置延迟时间:tag.creation-delay
。
步骤3:标签的自动删除
你可以配置tag.num-retained-max
或tag.default-time-retained
来自动删除标签。
例如,配置表每天0点10分创建一个标签,最大保留时间为3个月:
-- Flink SQL
CREATE TABLE t (k INT PRIMARY KEY NOT ENFORCED,f0 INT,...
) WITH ('tag.automatic-creation' = 'process-time','tag.creation-period' = 'daily','tag.creation-delay' = '10 m','tag.num-retained-max' = '90'
);INSERT INTO t SELECT...;-- Spark SQL-- 读取最新快照
SELECT * FROM t;-- 读取标签快照
SELECT * FROM t VERSION AS OF '2023-07-26';-- 读取标签间的增量数据
SELECT * FROM paimon_incremental_query('t', '2023-07-25', '2023-07-26');
更多关于Spark的查询,请参阅“查询表”。
创建标签
你可以使用给定的名称和快照ID创建标签。
Flink:
运行以下命令:
/bin/flink run \/path/to/paimon-flink-action-0.9.0.jar \create_tag \--warehouse \--database \ --table \--tag_name \[--snapshot ] \[--time_retained ] \[--catalog_conf [--catalog_conf ...]]
如果未设置snapshot
,snapshot_id
默认为最新的快照ID。
删除标签
你可以按名称删除标签。
Flink:
运行以下命令:
/bin/flink run \/path/to/paimon-flink-action-0.9.0.jar \delete_tag \--warehouse \--database \ --table \--tag_name \[--catalog_conf [--catalog_conf ...]]
回滚到标签
将表回滚到特定标签。所有快照ID大于该标签的快照和标签都将被删除(数据也会被删除)。
Flink:
运行以下命令:
/bin/flink run \/path/to/paimon-flink-action-0.9.0.jar \rollback_to \--warehouse \--database \ --table \--version \[--catalog_conf [--catalog_conf ...]]
拓展:
标签管理在数据历史版本维护中的意义:在大数据处理流程中,数据不断更新,快照虽然提供了历史数据查询的途径,但由于过期机制会删除旧数据文件,导致长期历史数据难以保留。通过标签管理,特别是自动创建标签功能,可以按特定周期(如每天)保留关键时间点的数据状态,这对于需要长期保存历史数据用于分析、审计或数据回溯的场景至关重要。例如,在电商数据分析中,每天创建标签可以保留每天的交易数据状态,方便后续分析每日业务趋势、对比不同日期的销售情况等。
自动创建标签模式的应用场景差异:不同的自动创建标签模式适用于不同的业务场景。
process-time
模式基于机器时间创建标签,简单直接,适用于对时间精度要求不高,主要按固定时间间隔(如每天、每小时)保留数据的场景。watermark
模式基于Sink输入的水印创建标签,更适合处理流数据且需要根据数据处理进度来标记数据的场景。例如,在实时流数据处理中,水印可以反映数据的完整性,基于水印创建标签能确保在数据完整性达到一定程度时标记数据,适用于对数据准确性和实时性要求较高的场景。batch
模式则适用于批处理作业,在任务完成后创建标签,适合定期批量处理数据并保留处理结果的场景,如每月的财务报表数据处理。标签操作与数据一致性:在进行标签的创建、删除和回滚操作时,需要注意数据一致性。例如,删除标签时,如果有其他作业正在依赖该标签的数据,可能会导致数据不一致或作业失败。回滚到标签操作不仅会删除后续的快照和标签,还会删除相关数据,这在生产环境中需要谨慎操作,确保不会误删重要数据。在实际应用中,应结合业务需求和数据依赖关系,制定合理的标签管理策略,以保证数据的一致性和可用性。例如,在删除标签前,可以先检查是否有作业正在使用该标签数据;在回滚操作前,进行数据备份或通知相关业务方,避免对业务造成影响。
Paimon指标(Metrics)
Paimon构建了一个指标系统,用于衡量读写行为,例如在上次规划中扫描了多少清单文件、上次提交操作花费了多长时间、上次合并操作删除了多少文件。
在Paimon的指标系统中,指标以表为粒度进行更新和报告。
Paimon指标系统提供三种类型的指标:仪表盘指标(Gauge)、计数器指标(Counter)、直方图指标(Histogram)。
-
仪表盘指标(Gauge):在某个时间点提供任意类型的值。
-
计数器指标(Counter):通过递增和递减来统计数值。
-
直方图指标(Histogram):衡量一组值的统计分布,包括最小值、最大值、平均值、标准差和百分位数。
Paimon支持内置指标来衡量提交、扫描、写入和合并操作,这些指标可以桥接到任何支持的计算引擎,如Flink、Spark等。
指标列表
以下是Paimon内置指标列表。它们分为扫描指标、提交指标、写入指标、写入缓冲区指标和合并指标几类。
扫描指标
指标名称 | 类型 | 描述 |
---|---|---|
lastScanDuration | 仪表盘指标(Gauge) | 完成上次扫描所花费的时间。 |
scanDuration | 直方图指标(Histogram) | 最近几次扫描所花费时间的分布情况。 |
lastScannedManifests | 仪表盘指标(Gauge) | 上次扫描中扫描的清单文件数量。 |
lastSkippedByPartitionAndStats | 仪表盘指标(Gauge) | 上次扫描中通过分区过滤和值/键统计信息跳过的表文件数量。 |
lastSkippedByBucketAndLevelFilter | 仪表盘指标(Gauge) | 上次扫描中通过桶、桶键和层级过滤跳过的表文件数量。 |
lastSkippedByWholeBucketFilesFilter | 仪表盘指标(Gauge) | 上次扫描中通过桶层级值过滤(仅主键表)跳过的表文件数量。 |
lastScanSkippedTableFiles | 仪表盘指标(Gauge) | 上次扫描中总共跳过的表文件数量。 |
lastScanResultedTableFiles | 仪表盘指标(Gauge) | 上次扫描中得到的表文件数量。 |
提交指标
指标名称 | 类型 | 描述 |
---|---|---|
lastCommitDuration | 仪表盘指标(Gauge) | 完成上次提交所花费的时间。 |
commitDuration | 直方图指标(Histogram) | 最近几次提交所花费时间的分布情况。 |
lastCommitAttempts | 仪表盘指标(Gauge) | 上次提交尝试的次数。 |
lastTableFilesAdded | 仪表盘指标(Gauge) | 上次提交中添加的表文件数量,包括新创建的数据文件和之后合并的文件。 |
lastTableFilesDeleted | 仪表盘指标(Gauge) | 上次提交中删除的表文件数量,这些文件来自之前的合并。 |
lastTableFilesAppended | 仪表盘指标(Gauge) | 上次提交中追加的表文件数量,即新创建的数据文件。 |
lastTableFilesCommitCompacted | 仪表盘指标(Gauge) | 上次提交中合并的表文件数量,包括之前和之后合并的文件。 |
lastChangelogFilesAppended | 仪表盘指标(Gauge) | 上次提交中追加的变更日志文件数量。 |
lastChangelogFileCommitCompacted | 仪表盘指标(Gauge) | 上次提交中合并的变更日志文件数量。 |
lastGeneratedSnapshots | 仪表盘指标(Gauge) | 上次提交中生成的快照文件数量,可能是1个或2个快照。 |
lastDeltaRecordsAppended | 仪表盘指标(Gauge) | 上次提交中以APPEND提交类型的增量记录数。 |
lastChangelogRecordsAppended | 仪表盘指标(Gauge) | 上次提交中以APPEND提交类型的变更日志记录数。 |
lastDeltaRecordsCommitCompacted | 仪表盘指标(Gauge) | 上次提交中以COMPACT提交类型的增量记录数。 |
lastChangelogRecordsCommitCompacted | 仪表盘指标(Gauge) | 上次提交中以COMPACT提交类型的变更日志记录数。 |
lastPartitionsWritten | 仪表盘指标(Gauge) | 上次提交中写入的分区数量。 |
lastBucketsWritten | 仪表盘指标(Gauge) | 上次提交中写入的桶数量。 |
写入缓冲区指标
指标名称 | 类型 | 描述 |
---|---|---|
numWriters | 仪表盘指标(Gauge) | 此并行度中的写入器数量。 |
bufferPreemptCount | 仪表盘指标(Gauge) | 被抢占的内存总数。 |
usedWriteBufferSizeByte | 仪表盘指标(Gauge) | 当前使用的写入缓冲区大小(以字节为单位)。 |
totalWriteBufferSizeByte | 仪表盘指标(Gauge) | 配置的总写入缓冲区大小(以字节为单位)。 |
合并指标
指标名称 | 类型 | 描述 |
---|---|---|
maxLevel0FileCount | 仪表盘指标(Gauge) | 此写入器当前处理的0级文件的最大数量。如果异步合并不能及时完成,该值会变大。 |
avgLevel0FileCount | 仪表盘指标(Gauge) | 此写入器当前处理的0级文件的平均数量。如果异步合并不能及时完成,该值会变大。 |
compactionThreadBusy | 仪表盘指标(Gauge) | 此并行度中合并线程的最大繁忙程度。目前,每个并行度只有一个合并线程,因此繁忙程度值范围从0(空闲)到100(一直运行合并)。 |
avgCompactionTime | 仪表盘指标(Gauge) | 合并线程的平均运行时间,基于记录的合并时间数据以毫秒为单位计算。该值表示合并操作的平均持续时间。值越高表明平均合并时间越长,可能意味着需要进行性能优化。 |
桥接至Flink
Paimon已实现将指标桥接到Flink的指标系统,这些指标可由Flink报告,并且指标组的生命周期由Flink管理。
使用Flink访问Paimon时,请结合<scope>.<infix>.<metric_name>
以获取完整的指标标识符,metric_name
可从指标列表中获取。
例如,在名为insert_word_count
的Flink作业中,表word_count
的lastPartitionsWritten
指标的标识符为:
localhost.taskmanager.localhost:60340-775a20.insert_word_count.Global Committer : word_count.0.paimon.table.word_count.commit.lastPartitionsWritten
。
从Flink Web界面中,进入提交器操作符的指标,显示为:
0.Global_Committer___word_count.paimon.table.word_count.commit.lastPartitionsWritten
。
请参考系统范围以了解Flink范围。扫描指标仅在Flink版本 >= 1.18时支持。
范围 | 中缀 |
---|---|
扫描指标 |
|
提交指标 |
|
写入指标 |
|
写入缓冲区指标 |
|
合并指标 |
|
Flink源指标 |
|
Flink sink指标 |
|
Flink连接器标准指标
当使用Flink进行读写时,Paimon实现了一些关键的标准Flink连接器指标来衡量源延迟和sink输出,详见FLIP-33:标准化连接器指标。此处列出已实现的Flink源/sink指标。
源指标(Flink)
指标名称 | 级别 | 类型 | 描述 |
---|---|---|---|
currentEmitEventTimeLag | Flink源操作符 | 仪表盘指标(Gauge) | 从源发送记录与文件创建之间的时间差。 |
currentFetchEventTimeLag | Flink源操作符 | 仪表盘指标(Gauge) | 读取数据文件与文件创建之间的时间差。 |
请注意,如果在流查询中指定了consumer-id
,源指标的级别应变为读取器操作符,该操作符在监控操作符之后。
sink指标(Flink)
指标名称 | 级别 | 类型 | 描述 |
---|---|---|---|
numBytesOut | 表 | 计数器指标(Counter) | 输出的总字节数。 |
numBytesOutPerSecond | 表 | 计量器(Meter) | 每秒输出的字节数。 |
numRecordsOut | 表 | 计数器指标(Counter) | 输出的总记录数。 |
numRecordsOutPerSecond | 表 | 计量器(Meter) | 每秒输出的记录数。 |
拓展:
指标系统对Paimon性能优化的作用:Paimon的指标系统为性能优化提供了详细的数据支持。通过扫描指标,可以了解扫描操作的效率,例如
lastScanDuration
能直观反映每次扫描的耗时,scanDuration
的直方图分布有助于发现扫描时间的波动情况,从而判断是否存在性能瓶颈。提交指标能帮助定位提交操作中的问题,如lastCommitDuration
可查看提交操作的时长,lastCommitAttempts
能发现提交是否频繁重试,进而分析原因进行优化。写入缓冲区指标和合并指标也分别从不同角度反映了写入和合并过程中的资源使用和性能状况,有助于调整相关参数以提升整体性能。桥接至Flink指标系统的优势:将Paimon指标桥接到Flink指标系统,使得在Flink环境中使用Paimon时,用户可以在统一的Flink指标体系下全面监控Paimon的读写等操作。Flink强大的指标管理和展示功能(如Flink Web界面)能更直观地呈现Paimon的运行状态,方便用户及时发现性能问题。同时,这种集成也便于与Flink自身的作业指标相结合,进行综合分析,例如将Paimon的写入指标与Flink Sink指标关联,更深入地理解数据从Flink作业流入Paimon的整个过程中的性能表现。
Flink连接器标准指标在数据流转监控中的意义:Flink连接器标准指标为监控数据在Flink与Paimon之间的流转提供了关键信息。源指标中的
currentEmitEventTimeLag
和currentFetchEventTimeLag
可以帮助用户了解数据从源端读取和发送的延迟情况,及时发现数据读取或发送过程中的异常。Sink指标中的numBytesOut
、numBytesOutPerSecond
、numRecordsOut
和numRecordsOutPerSecond
则直观地反映了数据写入Paimon的量和速率,对于评估写入性能、预测资源需求以及发现潜在的写入瓶颈非常重要。这些指标的结合使用,能全方位监控数据在Flink与Paimon之间的流转过程,保障数据处理流程的稳定和高效运行。
权限管理(Manage Privileges)
Paimon在目录层面提供了一个权限系统。权限决定了哪些用户可以对哪些对象执行哪些操作,这样你就能够以细粒度的方式管理表的访问权限。
目前,Paimon采用基于身份的访问控制(IBAC)权限模型。也就是说,权限直接分配给用户。
这个权限系统仅防止不被期望的用户通过目录访问表。它不会阻止通过临时表(通过在文件系统上指定表路径)进行的访问,也无法防止用户直接在文件系统上修改数据文件。如果你需要更严格的保护,应使用具有访问管理功能的文件系统。
基本概念
我们现在介绍权限系统的基本概念。
对象
对象是可以被授予访问权限的实体。除非通过授权允许,否则访问将被拒绝。
目前,Paimon的权限系统中有三种类型的对象:目录(CATALOG)、数据库(DATABASE)和表(TABLE)。对象具有逻辑层次结构,这与它们所代表的概念相关。例如:
-
如果用户被授予目录的某项权限,那么他也将对该目录中的所有数据库和所有表拥有此权限。
-
如果用户被授予数据库的某项权限,那么他也将对该数据库中的所有表拥有此权限。
-
如果用户的目录权限被撤销,那么他在该目录中的所有数据库和所有表上的此权限也将丧失。
-
如果用户的数据库权限被撤销,那么他在该数据库中的所有表上的此权限也将丧失。
权限
权限是对对象定义的访问级别。可以使用多种权限来控制授予对象的访问粒度。权限是特定于对象的。不同的对象可能有不同的权限。
目前,我们支持以下权限:
权限 | 描述 | 可授予的对象 |
---|---|---|
SELECT | 查询表中的数据。 | 表、数据库、目录 |
INSERT | 在表中插入、更新或删除数据。在表中创建或删除标签和分支。 | 表、数据库、目录 |
ALTER_TABLE | 修改表的元数据,包括表名、列名、表选项等。 | 表、数据库、目录 |
DROP_TABLE | 删除表。 | 表、数据库、目录 |
CREATE_TABLE | 在数据库中创建表。 | 数据库、目录 |
DROP_DATABASE | 删除数据库。 | 数据库、目录 |
CREATE_DATABASE | 在目录中创建数据库。 | 目录 |
ADMIN | 在目录中创建或删除特权用户,授予或撤销用户权限。 | 目录 |
用户
用户是可以被授予权限的实体。用户通过密码进行身份验证。
当权限系统启用时,会自动创建两个特殊用户:
-
根用户,在启用权限系统时通过提供的根密码进行标识。此用户始终在目录中拥有所有权限。
-
匿名用户。如果在创建目录时未提供用户名和密码,这将是默认用户。
启用权限
Paimon目前仅支持基于文件的权限系统。只有metastore
= 'filesystem'(默认值)或metastore
= 'hive'的目录支持这种权限系统。
要在文件系统/Hive目录上启用权限系统,请执行以下步骤: Flink 1.18+: 运行以下Flink SQL:
-- 使用你要启用权限系统的目录
USE CATALOG `my-catalog`;-- 通过提供根密码初始化权限系统
-- 将'root-password'更改为你想要的密码
CALL sys.init_file_based_privilege('root-password');
权限系统启用后,请重新创建目录并以根用户身份进行身份验证,以创建其他用户并授予他们权限。
权限系统不会影响现有的目录。也就是说,这些目录仍然可以自由访问和修改表。如果你想在这些目录中使用权限系统,请删除并使用所需的仓库路径重新创建所有目录。
访问受权限控制的目录
要访问受权限控制的目录并以用户身份进行身份验证,你需要在创建目录时定义用户和密码目录选项。例如,以下SQL创建一个目录,同时尝试以密码为mypassword
的根用户身份进行身份验证:
-- Flink
CREATE CATALOG `my-catalog` WITH ('type' = 'paimon',--...'user' = 'root','password' ='mypassword'
);
创建用户
你必须以具有ADMIN权限的用户(例如根用户)身份进行身份验证才能执行此操作。
要在权限系统中创建用户,请执行以下步骤: Flink 1.18+: 运行以下Flink SQL:
-- 使用你要创建用户的目录
-- 你必须在该目录中以具有ADMIN权限的用户身份进行身份验证
USE CATALOG `my-catalog`;-- 创建一个通过指定密码进行身份验证的用户
-- 将'user'和'password'更改为你想要的用户名和密码
CALL sys.create_privileged_user('user', 'password');
删除用户
你必须以具有ADMIN权限的用户(例如根用户)身份进行身份验证才能执行此操作。
要在权限系统中删除用户,请执行以下步骤: Flink 1.18+: 运行以下Flink SQL:
-- 使用你要删除用户的目录
-- 你必须在该目录中以具有ADMIN权限的用户身份进行身份验证
USE CATALOG `my-catalog`;-- 将'user'更改为你要删除的用户名
CALL sys.drop_privileged_user('user');
授予用户权限
你必须以具有ADMIN权限的用户(例如根用户)身份进行身份验证才能执行此操作。
要在权限系统中授予用户权限,请执行以下步骤: Flink 1.18+: 运行以下Flink SQL:
-- 使用你要授予权限的目录
-- 你必须在该目录中以具有ADMIN权限的用户身份进行身份验证
USE CATALOG `my-catalog`;-- 你可以将'user'更改为你想要的用户名,并将'SELECT'更改为你想要的其他权限
-- 授予'user'对整个目录的'SELECT'权限
CALL sys.grant_privilege_to_user('user', 'SELECT');
-- 授予'user'对数据库my_db的'SELECT'权限
CALL sys.grant_privilege_to_user('user', 'SELECT','my_db');
-- 授予'user'对表my_db.my_tbl的'SELECT'权限
CALL sys.grant_privilege_to_user('user', 'SELECT','my_db','my_tbl');
撤销用户权限
你必须以具有ADMIN权限的用户(例如根用户)身份进行身份验证才能执行此操作。
要在权限系统中撤销用户的权限,请执行以下步骤: Flink 1.18+: 运行以下Flink SQL:
-- 使用你要撤销权限的目录
-- 你必须在该目录中以具有ADMIN权限的用户身份进行身份验证
USE CATALOG `my-catalog`;-- 你可以将'user'更改为你想要的用户名,并将'SELECT'更改为你想要的其他权限
-- 撤销'user'对整个目录的'SELECT'权限
CALL sys.revoke_privilege_from_user('user', 'SELECT');
-- 撤销'user'对数据库my_db的'SELECT'权限
CALL sys.revoke_privilege_from_user('user', 'SELECT','my_db');
-- 撤销'user'对表my_db.my_tbl的'SELECT'权限
CALL sys.revoke_privilege_from_user('user', 'SELECT','my_db','my_tbl');
拓展:
权限系统在数据安全管理中的重要性:在大数据环境中,数据的安全性至关重要。Paimon的权限系统通过细粒度的权限控制,确保只有授权用户能够对特定对象执行相应操作,有效保护了数据的隐私和完整性。例如,在企业数据仓库中,不同部门的用户可能只需要对特定的表或数据库进行查询操作,通过授予SELECT权限,可以满足他们的数据需求,同时防止未经授权的修改。而对于管理员等具有更高权限的用户,可以授予ADMIN权限,以便进行用户管理和系统配置等操作。这种分层的权限管理模式,有助于构建一个安全可靠的数据访问环境。
基于身份的访问控制(IBAC)模型的特点:IBAC模型直接将权限分配给用户,使得权限管理相对简单直接。每个用户的权限明确,易于跟踪和管理。与其他访问控制模型(如基于角色的访问控制RBAC)相比,IBAC更适用于用户数量相对较少且权限分配较为个性化的场景。在Paimon中,由于数据对象(目录、数据库、表)的层次结构清晰,IBAC模型能够很好地与之结合,实现权限的级联授予和撤销,保证了权限管理的一致性和高效性。例如,当一个用户在目录级别被授予某种权限时,其在该目录下的所有数据库和表上自动拥有相同权限,减少了重复的权限设置工作。
权限系统与其他安全机制的配合:虽然Paimon的权限系统能够在目录层面控制用户对表的访问,但它存在一定的局限性,如无法阻止通过临时表或直接在文件系统上对数据文件的访问。因此,在实际应用中,需要与其他安全机制配合使用。例如,结合具有访问管理功能的文件系统(如HDFS的权限管理),可以进一步增强数据的安全性。同时,还可以与认证系统(如Kerberos)集成,提供更强大的用户身份验证功能,确保只有合法用户能够访问系统。这种多层次的安全防护体系,能够全方位保护大数据环境中的数据安全。
管理分支(Manage Branches)
在流数据处理中,修正数据比较困难,因为这可能会影响现有数据,而且用户会看到流处理的临时结果,这并非预期情况。
我们假设现有工作流程正在处理的分支是“main”分支。通过创建自定义数据分支,有助于在现有表上对新作业进行实验测试和数据验证,无需停止现有的读/写工作流程,也无需从主分支复制数据。
通过合并或替换分支操作,用户可以完成数据的修正。
创建分支
Paimon支持从特定标签或快照创建分支,或者创建一个空分支,这意味着创建的分支的初始状态类似于一个空表。
Flink
运行以下SQL:
-- 从标签'tag1'创建名为'branch1'的分支
CALL sys.create_branch('default.T', 'branch1', 'tag1');-- 创建名为'branch1'的空分支
CALL sys.create_branch('default.T', 'branch1');
删除分支
你可以按名称删除分支。
Flink
运行以下SQL:
CALL sys.delete_branch('default.T', 'branch1');
基于分支的读/写操作
你可以按如下方式基于分支进行读或写操作。
Flink
-- 从'branch1'分支读取数据
SELECT * FROM `t$branch_branch1`;
SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' ='myid') */;-- 向'branch1'分支写入数据
INSERT INTO `t$branch_branch1` SELECT...
快速推进
将自定义分支快速推进到主分支,会删除主分支中在该分支初始标签之后创建的所有快照、标签和模式。并将分支中的快照、标签和模式复制到主分支。
Flink
CALL sys.fast_forward('default.T', 'branch1');
从回退分支进行批量读取
你可以设置表选项scan.fallback-branch
,这样当批处理作业从当前分支读取数据时,如果某个分区不存在,读取器将尝试从回退分支读取该分区。对于流读取作业,目前不支持此功能,流读取作业只会从当前分支生成结果。
这个功能的用例是什么呢?假设你创建了一个按日期分区的Paimon表。你有一个长期运行的流作业,将记录插入到Paimon中,以便可以及时查询当天的数据。你还有一个批处理作业,每晚运行,将前一天修正后的记录插入到Paimon中,以保证数据的准确性。
当你从这个Paimon表查询数据时,你希望首先从批处理作业的结果中读取数据。但是如果某个分区(例如,当天的分区)在其结果中不存在,那么你希望从流作业的结果中读取数据。在这种情况下,你可以为流作业创建一个分支,并将scan.fallback-branch
设置为这个流分支。
让我们看一个示例。
Flink
-- 创建Paimon表
CREATE TABLE T (dt STRING NOT NULL,name STRING NOT NULL,amount BIGINT
) PARTITIONED BY (dt);-- 为流作业创建一个分支
CALL sys.create_branch('default.T', 'test');-- 为分支设置主键和桶数量
ALTER TABLE `T$branch_test` SET ('primary-key' = 'dt,name','bucket' = '2','changelog-producer' = 'lookup'
);-- 设置回退分支
ALTER TABLE T SET ('scan.fallback-branch' = 'test'
);-- 向流分支写入记录
INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);-- 向默认分支写入记录
INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
| 20240725 | banana | 7 |
| 20240726 | cherry | 3 |
| 20240726 | pear | 6 |
+------------------+------------------+--------+
*/-- 重置回退分支
ALTER TABLE T RESET ('scan.fallback-branch');-- 现在它只从默认分支读取数据
SELECT * FROM T;
/*
+------------------+------------------+--------+
| dt | name | amount |
+------------------+------------------+--------+
| 20240725 | apple | 5 |
| 20240725 | banana | 7 |
+------------------+------------------+--------+
*/
拓展:
分支管理在流数据处理中的优势:在流数据处理场景下,数据持续流动且处理过程实时性强,传统的数据修正方式可能会干扰正在运行的流程或影响现有数据的完整性。Paimon的分支管理功能为解决这些问题提供了有效的途径。通过创建分支,数据工程师可以在不影响主工作流程的前提下,对新的处理逻辑或数据修正操作进行试验。例如,在实时数据分析平台中,为了验证新的算法对数据处理结果的影响,可以在分支上进行测试,确保无误后再通过合并操作应用到主分支,从而保证了主工作流程的稳定性和数据的一致性。
快速推进操作的应用场景:快速推进操作在数据版本管理和更新方面具有重要意义。当在分支上完成了数据的验证、修正或新功能的开发后,通过快速推进将分支的状态同步到主分支,能够高效地将这些变更集成到主数据流程中。例如,在一个基于Paimon构建的数据仓库中,数据分析师在分支上对历史数据进行了重新计算和修正,完成后使用快速推进操作,可以将修正后的数据快速应用到主分支,使得后续的查询和分析能够基于准确的数据进行,同时避免了手动逐个复制和更新相关数据对象(如快照、标签和模式)的繁琐过程,提高了数据管理的效率。
回退分支在数据查询中的应用:设置回退分支为数据查询提供了更大的灵活性和数据完整性保障。在实际业务中,不同的数据处理作业(如批处理和流处理)可能在不同的时间点生成数据,并且数据的准确性和时效性要求也不同。通过设置回退分支,查询操作可以根据数据的可用性和需求,优先从批处理作业结果中获取更准确的数据,如果特定分区数据缺失,则从流处理作业结果中获取最新数据。这种机制在数据查询场景中非常实用,例如在电商销售数据统计中,批处理作业每天晚上对前一天的销售数据进行修正和汇总,流处理作业实时处理当天的销售数据。通过回退分支设置,查询可以在保证数据准确性的同时,获取到最新的销售数据,为业务决策提供更全面的支持。
配置(Configuration)
核心选项(CoreOptions)
Paimon的核心配置选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
async-file-write | true | 布尔值 | 写入文件时是否启用异步IO写入。 |
auto-create | false | 布尔值 | 读写表时是否自动创建底层存储。 |
branch | "main" | 字符串 | 指定分支名称。 |
bucket | -1 | 整数 | 文件存储的桶数量。它要么等于 -1(动态桶模式),要么必须大于0(固定桶模式)。 |
bucket-key | (none) | 字符串 | 指定Paimon的数据分布策略。数据根据bucket-key的哈希值分配到各个桶中。如果指定多个字段,用逗号分隔。如果未指定,则使用主键;如果没有主键,则使用整行数据。 |
cache-page-size | 64 kb | 内存大小 | 缓存的内存页大小。 |
changelog-producer | none | 枚举值 | 是否双写到变更日志文件。此变更日志文件记录数据更改的详细信息,在流读取时可直接读取。这适用于有主键的表。可能的值: "none":无变更日志文件。 "input":刷新内存表时双写到变更日志文件,变更日志来自输入。 "full-compaction":每次全量合并时生成变更日志文件。 "lookup":在提交数据写入之前通过“lookup”生成变更日志文件。 |
changelog-producer.row-deduplicate | false | 布尔值 | 对于相同记录是否生成 -U、+U变更日志。此配置仅在changelog-producer为lookup或full-compaction时有效。 |
changelog.num-retained.max | (none) | 整数 | 保留的已完成变更日志的最大数量。应大于或等于最小数量。 |
changelog.num-retained.min | (none) | 整数 | 保留的已完成变更日志的最小数量。应大于或等于1。 |
changelog.time-retained | (none) | 持续时间 | 保留已完成变更日志的最长时间。 |
commit.callback.#.param | (none) | 字符串 | 类#构造函数的参数字符串。回调类应自行解析参数。 |
commit.callbacks | (none) | 字符串 | 提交成功后要调用的提交回调类列表。类名用逗号连接(例如:com.test.CallbackA,com.sample.CallbackB)。 |
commit.force-compact | false | 布尔值 | 提交前是否强制进行合并。 |
commit.force-create-snapshot | false | 布尔值 | 提交时是否强制创建快照。 |
commit.user-prefix | (none) | 字符串 | 指定提交用户前缀。 |
compaction.max-size-amplification-percent | 200 | 整数 | 大小放大定义为在变更日志模式表的合并树中存储单个字节数据所需的额外存储量(以百分比表示)。 |
compaction.max.file-num | (none) | 整数 | 对于文件集[f_0,...,f_N],即使sum(size(f_i)) < targetFileSize,追加表触发合并的最大文件数。此值可避免过多小文件堆积。追加表的默认值为'50'。分桶追加表的默认值为'5'。 |
compaction.min.file-num | 5 | 整数 | 对于文件集[f_0,...,f_N],满足sum(size(f_i)) >= targetFileSize以触发追加表合并的最小文件数。此值可避免几乎满的文件被合并,因为这并不划算。 |
compaction.optimization-interval | (none) | 持续时间 | 表示执行优化合并的频率,此配置用于确保读优化系统表的查询及时性。 |
compaction.size-ratio | 1 | 整数 | 变更日志模式表中比较排序运行大小的百分比灵活性。如果候选排序运行(s)的大小比下一个排序运行的大小小1%,则将下一个排序运行包含到该候选集中。 |
consumer-id | (none) | 字符串 | 用于记录存储中消费偏移量的消费者ID。 |
consumer.expiration-time | (none) | 持续时间 | 消费者文件的过期间隔。如果自上次修改后消费者文件的生存时间超过此值,则该文件将过期。 |
consumer.ignore-progress | false | 布尔值 | 新启动的作业是否忽略消费者进度。 |
consumer.mode | exactly-once | 枚举值 | 指定表的消费者一致性模式。可能的值: "exactly-once":读取器以快照粒度消费数据,并严格确保消费者中记录的快照ID是所有读取器都准确消费的快照ID + 1。 "at-least-once":每个读取器以不同速率消费快照,所有读取器中消费进度最慢的快照将记录在消费者中。 |
continuous.discovery-interval | 10 s | 持续时间 | 持续读取的发现间隔。 |
cross-partition-upsert.bootstrap-parallelism | 10 | 整数 | 跨分区更新中单个任务引导的并行度。 |
cross-partition-upsert.index-ttl | (none) | 持续时间 | 跨分区更新(主键不包含所有分区字段)在RocksDB索引中的TTL,这可以避免维护过多索引导致性能越来越差,但请注意这也可能导致数据重复。 |
delete-file.thread-num | (none) | 整数 | 并发删除文件的最大数量。默认值为Java虚拟机可用的处理器数量。 |
delete.force-produce-changelog | false | 布尔值 | 在删除SQL中强制生成变更日志,或者你可以使用'streaming-read-overwrite'从覆盖提交中读取变更日志。 |
deletion-vector.index-file.target-size | 2 mb | 内存大小 | 删除向量索引文件的目标大小。 |
deletion-vectors.enabled | false | 布尔值 | 是否启用删除向量模式。在此模式下,写入数据时会生成包含删除向量的索引文件,标记要删除的数据。在读取操作中,通过应用这些索引文件,可以避免合并。 |
dynamic-bucket.assigner-parallelism | (none) | 整数 | 动态桶模式下分配器操作符的并行度,它也与初始化桶的数量相关,过小会导致分配器处理速度不足。 |
dynamic-bucket.initial-buckets | (none) | 整数 | 动态桶模式下分配器操作符中一个分区的初始桶数。 |
dynamic-bucket.target-row-num | 2000000 | 长整数 | 如果桶数为 -1,对于主键表,即动态桶模式,此选项控制一个桶的目标行数。 |
dynamic-partition-overwrite | true | 布尔值 | 覆盖有动态分区列的分区表时是否仅覆盖动态分区。仅在表有分区键时有效。 |
end-input.check-partition-expire | false | 布尔值 | 批处理模式或有界流情况下可选的endInput检查分区过期。 |
fields.default-aggregate-function | (none) | 字符串 | 部分更新和聚合合并函数中所有字段的默认聚合函数。 |
file-index.in-manifest-threshold | 500 bytes | 内存大小 | 在清单中存储文件索引字节的阈值。 |
file-index.read.enabled | true | 布尔值 | 是否启用读取文件索引。 |
file-reader-async-threshold | 10 mb | 内存大小 | 异步读取文件的阈值。 |
file.block-size | (none) | 内存大小 | 格式的文件块大小,orc条带的默认值为64MB,parquet行组的默认值为128MB。 |
file.compression | "zstd" | 字符串 | 默认文件合并方式。为了更快的读写速度,建议使用zstd。 |
file.compression.per.level | Map | 定义不同层级的不同合并策略,你可以这样添加配置:'file.compression.per.level' = '0:lz4,1:zstd'。 | |
file.compression.zstd-level | 1 | 整数 | 默认文件合并zstd级别。如需更高合并率,可配置为9,但读写速度会显著下降。 |
file.format | "parquet" | 字符串 | 指定数据文件的消息格式,目前支持orc、parquet和avro。 |
file.format.per.level | Map | 定义不同层级的不同文件格式,你可以这样添加配置:'file.format.per.level' = '0:avro,3:parquet',如果未提供某个层级的文件格式,则使用 | |
force-lookup | false | 布尔值 | 是否强制在合并时使用lookup。 |
full-compaction.delta-commits | (none) | 整数 | 增量提交后将不断触发全量合并。 |
ignore-delete | false | 布尔值 | 是否忽略删除记录。 |
incremental-between | (none) | 字符串 | 读取起始快照(不包含)和结束快照之间的增量更改,例如,'5,10'表示快照5和快照10之间的更改。 |
incremental-between-scan-mode | auto | 枚举值 | 读取起始快照(不包含)和结束快照之间的增量更改时的扫描类型。可能的值: "auto":对于生成变更日志文件的表,扫描变更日志文件。否则,扫描新更改的文件。 "delta":扫描快照之间新更改的文件。 "changelog":扫描快照之间的变更日志文件。 |
incremental-between-timestamp | (none) | 字符串 | 读取起始时间戳(不包含)和结束时间戳之间的增量更改,例如,'t1,t2'表示时间戳t1和时间戳t2之间的更改。 |
local-merge-buffer-size | (none) | 内存大小 | 本地合并将在输入记录按桶进行洗牌并写入接收器之前对其进行缓冲和合并。缓冲区满时将被刷新。主要用于解决主键上的数据倾斜问题。尝试此功能时,建议从64mb开始。 |
local-sort.max-num-file-handles | 128 | 整数 | 外部合并排序的最大扇入数。它限制了文件句柄的数量。如果太小,可能会导致中间合并。但如果太大,会导致同时打开太多文件,消耗内存并导致随机读取。 |
lookup-wait | true | 布尔值 | 需要lookup时,提交是否等待通过lookup进行合并。 |
lookup.cache-file-retention | 1 h | 持续时间 | lookup缓存文件的保留时间。文件过期后,如果需要访问,将从DFS重新读取以在本地磁盘上构建索引。 |
lookup.cache-max-disk-size | infinite | 内存大小 | lookup缓存的最大磁盘大小,你可以使用此选项限制本地磁盘的使用。 |
lookup.cache-max-memory-size | 256 mb | 内存大小 | lookup缓存的最大内存大小。 |
lookup.cache-spill-compression | "zstd" | 字符串 | lookup缓存的溢出合并方式,目前支持zstd、none、lz4和lzo。 |
lookup.cache.bloom.filter.enabled | true | 布尔值 | 是否为lookup缓存启用布隆过滤器。 |
lookup.cache.bloom.filter.fpp | 0.05 | 双精度浮点数 | 定义lookup缓存布隆过滤器的默认误判率。 |
lookup.hash-load-factor | 0.75 | 单精度浮点数 | lookup的索引负载因子。 |
lookup.local-file-type | hash | 枚举值 | lookup的本地文件类型。可能的值: "sort":构建用于lookup的排序文件。 "hash":构建用于lookup的哈希文件。 |
manifest.compression | "zstd" | 字符串 | 清单文件的默认文件合并方式。 |
manifest.format | "avro" | 字符串 | 指定清单文件的消息格式。 |
manifest.full-compaction-threshold-size | 16 mb | 内存大小 | 触发清单全量合并的大小阈值。 |
manifest.merge-min-count | 30 | 整数 | 为避免频繁的清单合并,此参数指定合并的最小ManifestFileMeta数量。 |
manifest.target-file-size | 8 mb | 内存大小 | 建议的清单文件大小。 |
merge-engine | deduplicate | 枚举值 | 为有主键的表指定合并引擎。可能的值: "deduplicate":去重并保留最后一行。 "partial-update":部分更新非空字段。 "aggregation":聚合具有相同主键的字段。 "first-row":去重并保留第一行。 |
metadata.iceberg-compatible | false | 布尔值 | 设置为true时,提交快照后生成Iceberg元数据,以便Iceberg读取器可以读取Paimon的原始文件。 |
metadata.stats-mode | "truncate(16)" | 字符串 | 元数据统计信息收集模式。可用值为none、counts、truncate(16)、full。 "none":表示禁用元数据统计信息收集。 "counts":表示仅收集空值计数。 "full":表示收集空值计数、最小值/最大值。 "truncate(16)":表示收集空值计数、截断长度为16的最小值/最大值。字段级统计模式可通过fields.{field_name}.stats-mode指定。 |
metastore.partitioned-table | false | 布尔值 | 是否在元存储中将此表创建为分区表。例如,如果你想在Hive中列出Paimon表的所有分区,你需要在Hive元存储中将此表创建为分区表。此配置选项不影响默认的文件系统元存储。 |
metastore.tag-to-partition | (none) | 字符串 | 是否在元存储中将此表创建为分区表,用于映射非分区表标签。这允许Hive引擎以分区表视图查看此表,并使用分区字段读取特定分区(特定标签)。 |
metastore.tag-to-partition.preview | none | 枚举值 | 是否在元存储中预览生成快照的标签。这允许Hive引擎在创建之前查询特定标签。可能的值: "none":无自动创建的标签。 "process-time":基于机器时间,处理时间经过周期时间加上延迟后创建标签。 "watermark":基于输入的水印,水印经过周期时间加上延迟后创建标签。 "batch":在批处理场景中,任务完成后生成与当前快照对应的标签。 |
num-levels | (none) | 整数 | 总层级数,例如,有3个层级,包括0、1、2层级。 |
num-sorted-run.compaction-trigger | 5 | 整数 | 触发合并的排序运行数。包括0级文件(一个文件一个排序运行)和高级运行(一个层级一个排序运行)。 |
num-sorted-run.stop-trigger | (none) | 整数 | 触发停止写入的排序运行数,默认值为'num-sorted-run.compaction-trigger' + 3。 |
page-size | 64 kb | 内存大小 | 内存页大小。 |
parquet.enable.dictionary | (none) | 整数 | 关闭parquet中所有字段的字典编码。 |
partial-update.remove-record-on-delete | false | 布尔值 | 在部分更新引擎中收到 -D记录时是否删除整行。 |
partition | (none) | 字符串 | 通过表选项定义分区,不能同时在DDL和表选项中定义分区。 |
partition.default-name | "DEFAULT_PARTITION" | 字符串 | 动态分区列值为空/空字符串时的默认分区名称。 |
partition.expiration-check-interval | 1 h | 持续时间 | 分区过期检查间隔。 |
partition.expiration-strategy | values-time | 枚举值 | 确定如何提取分区时间并将其与当前时间进行比较的策略。可能的值: "values-time":此策略将从分区值中提取的时间与当前时间进行比较。 "update-time":此策略将分区的最后更新时间与当前时间进行比较。 |
partition.expiration-time | (none) | 持续时间 | 分区的过期间隔。如果分区的生存时间超过此值,则分区将过期。分区时间从分区值中提取。 |
partition.mark-done-action | "success-file" | 字符串 | 标记分区完成的操作是通知下游应用程序该分区已完成写入,分区已准备好读取。1.'success-file':在目录中添加'_success'文件。2.'done-partition':在元存储中添加'xxx.done'分区。3.'mark-event':在元存储中标记分区事件。可以同时配置多个:'done-partition,success-file,mark-event'。 |
partition.timestamp-formatter | (none) | 字符串 | 用于从字符串格式化时间戳的格式化器。它可以与'partition.timestamp-pattern'一起使用,使用指定值创建格式化器。默认格式化器为'yyyy-MM-dd HH:mm:ss'和'yyyy-MM-dd'。支持多个分区字段,如'𝑦𝑒𝑎𝑟−year−month-$day $hour:00:00'。时间戳格式化器与Java的DateTimeFormatter兼容。 |
partition.timestamp-pattern | (none) | 字符串 | 你可以指定一个模式从分区中获取时间戳。格式化器模式由'partition.timestamp-formatter |
primary-key | (none) | 字符串 | 通过表选项定义主键,不能同时在DDL和表选项中定义主键。 |
read.batch-size | 1024 | 整数 | orc和parquet的读取批大小。 |
record-level.expire-time | (none) | 持续时间 | 主键表的记录级过期时间,过期在合并时发生,不能保证及时过期记录。你还必须指定'record-level.time-field'。 |
record-level.time-field | (none) | 字符串 | 记录级过期的时间字段。 |
record-level.time-field-type | seconds-int | 枚举值 | 记录级过期的时间字段类型,可以是seconds-int或millis-long。可能的值: "seconds-int":以秒为单位的时间戳应为INT类型。 "millis-long":以毫秒为单位的时间戳应为BIGINT类型。 |
rowkind.field | (none) | 字符串 | 为主键表生成行类型的字段,行类型决定哪些数据是 '+I'、'-U'、'+U' 或 '-D'。 |
scan.bounded.watermark | (none) | 长整数 | 有界流模式的结束条件“水印”。当遇到更大水印的快照时,流读取将结束。 |
scan.fallback-branch | (none) | 字符串 | 当批处理作业从表中查询时,如果当前分支中不存在某个分区,读取器将尝试从这个回退分支获取该分区。 |
scan.file-creation-time-millis | (none) | 长整数 | 配置此时间后,仅读取在此时间之后创建的数据文件。它独立于快照,但这是一种不精确的过滤(取决于是否发生合并)。 |
scan.manifest.parallelism | (none) | 整数 | 扫描清单文件的并行度,默认值为CPU处理器数量。注意:增大此参数会在扫描清单文件时增加内存使用。当扫描时遇到内存不足异常时,我们可以考虑减小它。 |
scan.max-splits-per-task | 10 | 整数 | 扫描时一个任务应缓存的最大分割大小。如果枚举器中缓存的分割大小大于任务数量乘以此值,扫描器将暂停扫描。 |
scan.mode | default | 枚举值 | 指定源的扫描行为。可能的值: "default":根据其他表属性确定实际启动模式。如果设置了 "scan.timestamp-millis",实际启动模式将是 "from-timestamp";如果设置了 "scan.snapshot-id" 或 "scan.tag-name",实际启动模式将是 "from-snapshot"。否则,实际启动模式将是 "latest-full"。 "latest-full":对于流源,首次启动时生成表上的最新快照,并继续读取最新更改。对于批源,仅生成最新快照但不读取新更改。 "full":已弃用。与 "latest-full" 相同。 "latest":对于流源,持续读取最新更改,开始时不生成快照。对于批源,行为与 "latest-full" 启动模式相同。 "compacted-full":对于流源,首次启动时在表上的最新合并后生成快照,并继续读取最新更改。对于批源,仅在最新合并后生成快照但不读取新更改。启用计划全量合并时,选择全量合并的快照。 "from-timestamp":对于流源,从 "scan.timestamp-millis" 指定的时间戳开始持续读取更改,开始时不生成快照。对于批源,在 "scan.timestamp-millis" 指定的时间戳生成快照但不读取新更改。 "from-file-creation-time":对于流和批源,生成快照并按创建时间过滤数据文件。对于流源,首次启动时,并继续读取最新更改。 "from-snapshot":对于流源,从 "scan.snapshot-id" 指定的快照开始持续读取更改,开始时不生成快照。对于批源,生成 "scan.snapshot-id" 或 "scan.tag-name" 指定的快照但不读取新更改。 "from-snapshot-full":对于流源,首次启动时从表上 "scan.snapshot-id" 指定的快照生成,并持续读取更改。对于批源,生成 "scan.snapshot-id" 指定的快照但不读取新更改。 "incremental":读取起始和结束快照或时间戳之间的增量更改。 |
scan.plan-sort-partition | false | 布尔值 | 是否按分区字段对计划文件进行排序,这允许你按分区顺序读取,即使你的分区写入无序。建议对“追加仅”表的流读取使用此选项。默认情况下,流读取将首先读取完整快照。为了避免分区的无序读取,你可以打开此选项。 |
scan.snapshot-id | (none) | 长整数 | “from-snapshot” 或 “from-snapshot-full” 扫描模式下可选的快照ID。 |
scan.tag-name | (none) | 字符串 | “from-snapshot” 扫描模式下可选的标签名称。 |
scan.timestamp | (none) | 字符串 | “from-timestamp” 扫描模式下可选的时间戳,它将自动转换为 Unix 毫秒时间戳,使用本地时区。 |
scan.timestamp-millis | (none) | 长整数 | “from-timestamp” 扫描模式下可选的时间戳。如果没有早于此时间的快照,将选择最早的快照。 |
scan.watermark | (none) | 长整数 | “from-snapshot” 扫描模式下可选的水印。如果没有晚于此水印的快照,将抛出异常。 |
sequence.field | (none) | 字符串 | 为主键表生成序列号的字段,序列号决定哪些数据是最新的。 |
sink.watermark-time-zone | "UTC" | 字符串 | 将长整型水印值解析为TIMESTAMP值的时区。默认值为'UTC',这意味着水印定义在TIMESTAMP列上或未定义。如果水印定义在TIMESTAMP_LTZ列上,水印的时区是用户配置的时区,该值应为用户配置的本地时区。选项值可以是完整名称,如'America/Los_Angeles',也可以是自定义时区ID,如'GMT-08:00'。 |
snapshot.clean-empty-directories | false | 布尔值 | 过期快照时是否尝试清理空目录,如果启用,请注意: hdfs:可能会在NameNode中打印异常。 oss/s3:可能会导致性能问题。 |
snapshot.expire.execution-mode | sync | 枚举值 | 指定过期的执行模式。可能的值: "sync":同步执行过期。如果文件太多,可能需要很长时间并阻塞流处理。 "async":异步执行过期。如果快照的生成大于删除,将有文件积压。 |
snapshot.expire.limit | 10 | 整数 | 一次允许过期的最大快照数量。 |
snapshot.num-retained.max | infinite | 整数 | 保留的已完成快照的最大数量。应大于或等于最小数量。 |
snapshot.num-retained.min | 10 | 整数 | 保留的已完成快照的最小数量。应大于或等于1。 |
snapshot.time-retained | 1 h | 持续时间 | 保留已完成快照的最长时间。 |
snapshot.watermark-idle-timeout | (none) | 持续时间 | 在水印处理中,如果源在指定的超时持续时间内保持空闲,则会触发快照推进并有助于标签创建。 |
sort-compaction.local-sample.magnification | 1000 | 整数 | 排序合并的本地样本放大倍数。本地样本的大小为sink并行度 * 放大倍数。 |
sort-compaction.range-strategy | QUANTITY | 枚举值 | 排序合并的范围策略,默认值为quantity。如果分配给排序任务的数据大小不均匀,这可能会导致性能瓶颈,可将此配置设置为size。可能的值: "SIZE" "QUANTITY" |
sort-engine | loser-tree | 枚举值 | 为有主键的表指定排序引擎。可能的值: "min-heap":使用最小堆进行多路排序。 "loser-tree":使用失败者树进行多路排序。与堆排序相比,失败者树的比较次数更少,效率更高。 |
sort-spill-buffer-size | 64 mb | 内存大小 | 溢出排序中溢出记录到磁盘的数据量。 |
sort-spill-threshold | (none) | 整数 | 如果排序读取器的最大数量超过此值,将尝试溢出。这可以防止太多读取器消耗过多内存并导致OOM。 |
source.split.open-file-cost | 4 mb | 内存大小 | 源文件的打开文件成本。它用于避免源分割读取太多文件,这可能会非常慢。 |
source.split.target-size | 128 mb | 内存大小 | 扫描桶时源分割的目标大小。 |
spill-compression | "zstd" | 字符串 | 溢出合并方式,目前支持zstd、lzo和zstd。 |
spill-compression.zstd-level | 1 | 整数 | 默认溢出合并zstd级别。如需更高合并率,可配置为9,但读写速度会显著下降。 |
streaming-read-mode | (none) | 枚举值 | 流读取模式,指定从表文件或日志中读取数据。可能的值: "log":从表日志存储的数据中读取。 "file":从表文件存储的数据中读取。 |
streaming-read-overwrite | false | 布尔值 | 是否在流模式下从覆盖中读取更改。当changelog producer为full-compaction或lookup时,不能设置为true,因为这会读取重复的更改。 |
streaming.read.snapshot.delay | (none) | 持续时间 | 扫描增量快照时流读取的延迟持续时间。 |
tag.automatic-completion | false | 布尔值 | 是否自动完成缺失的标签。 |
tag.automatic-creation | none | 枚举值 | 是否自动创建标签以及如何生成标签。可能的值: "none":无自动创建的标签。 "process-time":基于机器时间,处理时间经过周期时间加上延迟后创建标签。 "watermark":基于输入的水印,水印经过周期时间加上延迟后创建标签。 "batch":在批处理场景中,任务完成后生成与当前快照对应的标签。 |
tag.callback.#.param | (none) | 字符串 | 类#构造函数的参数字符串。回调类应自行解析参数。 |
tag.callbacks | (none) | 字符串 | 成功创建标签后要调用的提交回调类列表。类名用逗号连接(例如:com.test.CallbackA,com.sample.CallbackB)。 |
tag.creation-delay | 0 ms | 持续时间 | 周期结束后创建标签的延迟时间。这可以允许一些延迟数据进入标签。 |
tag.creation-period | daily | 枚举值 | 用于生成标签的频率。可能的值: "daily":每天生成一个标签。 "hourly":每小时生成一个标签。 "two-hours":每两小时生成一个标签。 |
tag.default-time-retained | (none) | 持续时间 | 新创建标签的默认最长保留时间。它会影响自动创建的标签和手动(通过过程)创建的标签。 |
tag.num-retained.max | (none) | 整数 | 保留的最大标签数量。它仅影响自动创建的标签。 |
tag.period-formatter | with_dashes | 枚举值 | 标签周期的日期格式。可能的值: "with_dashes":带破折号的日期和小时,例如,'yyyy-MM-dd HH' "without_dashes":不带破折号的日期和小时,例如,'yyyyMMdd HH' |
target-file-size | (none) | 内存大小 | 文件的目标大小。 主键表:默认值为128MB。 追加表:默认值为256MB。 |
write-buffer-for-append | false | 布尔值 | 此选项仅适用于追加表。写入是否使用写缓冲区以避免内存不足错误。 |
write-buffer-size | 256 mb | 内存大小 | 在转换为排序的磁盘文件之前,在内存中累积的数据量。 |
write-buffer-spill.max-disk-size | infinite | 内存大小 | 写缓冲区溢出使用的最大磁盘空间。仅在启用写缓冲区溢出时有效。 |
write-buffer-spillable | (none) | 布尔值 | 写缓冲区是否可溢出。使用对象存储时默认启用。 |
write-manifest-cache | 0 bytes | 内存大小 | 写入初始化时读取清单文件的缓存大小。 |
write-max-writers-to-spill | 10 | 整数 | 在批处理追加插入中,如果写入器数量大于此选项,我们将打开缓冲区缓存和溢出功能以避免内存不足。 |
write-only | false | 布尔值 | 如果设置为true,将跳过合并和快照过期。此选项与专用合并作业一起使用。 |
zorder.var-length-contribution | 8 | 整数 | 类型(CHAR、VARCHAR、BINARY、VARBINARY)对zorder排序贡献的字节数。 |
目录选项(CatalogOptions)
Paimon目录的选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
allow-upper-case | (none) | 布尔值 | 指示此目录是否允许大写,其默认值取决于特定目录的实现。 |
cache-enabled | true | 布尔值 | 控制目录是否缓存数据库、表和清单。 |
cache.expiration-interval | 1 min | 持续时间 | 控制目录中数据库和表的缓存持续时间。 |
cache.manifest.max-memory | (none) | 内存大小 | 控制缓存清单内容的最大内存。 |
cache.manifest.small-file-memory | 128 mb | 内存大小 | 控制缓存小清单文件的缓存内存。 |
cache.manifest.small-file-threshold | 1 mb | 内存大小 | 控制小清单文件的阈值。 |
client-pool-size | 2 | 整数 | 配置连接池的大小。 |
fs.allow-hadoop-fallback | true | 布尔值 | 当找不到针对该方案的文件IO时,是否允许回退到Hadoop文件IO。 |
lineage-meta | (none) | 字符串 | 用于存储表和数据血缘信息的血缘元数据。可能的值: "jdbc":使用标准jdbc存储表和数据血缘信息。 "custom":你可以实现LineageMetaFactory和LineageMeta,将血缘信息存储在自定义存储中。 |
lock-acquire-timeout | 8 min | 持续时间 | 获取锁的最大等待时间。 |
lock-check-max-sleep | 8 s | 持续时间 | 重试检查锁时的最大睡眠时间。 |
lock.enabled | (none) | 布尔值 | 启用目录锁。 |
lock.type | (none) | 字符串 | 目录的锁类型,如'hive'、'zookeeper'。 |
metastore | "filesystem" | 字符串 | Paimon目录的元存储,支持filesystem、hive和jdbc。 |
sync-all-properties | false | 布尔值 | 是否将所有表属性同步到Hive元存储。 |
table.type | managed | 枚举值 | 表的类型。可能的值: "managed":Paimon拥有的表,表数据的整个生命周期由Paimon管理。 "external":Paimon与存储在外部位置的数据松散耦合的表。 |
uri | (none) | 字符串 | 元存储服务器的URI。 |
warehouse | (none) | 字符串 | 目录的仓库根路径。 |
文件系统目录选项(FilesystemCatalogOptions)
文件系统目录的选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
case-sensitive | true | 布尔值 | 是否区分大小写。如果不区分大小写,你需要将此选项设置为false,并且表名和字段将转换为小写。 |
Hive目录选项(HiveCatalogOptions)
Hive目录的选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
client-pool-cache.eviction-interval-ms | 300000 | 长整数 | 设置客户端池缓存的逐出间隔(毫秒)。 |
client-pool-cache.keys | (none) | 字符串 | 指定客户端缓存键,多个元素用逗号分隔。 "ugi":表示使用缓存的当前用户的Hadoop UserGroupInformation实例。 “user_name”类似于UGI,但仅包括由UserGroupInformation#getUserName确定的用户名。 “conf”:任意配置的名称。该配置的值将从目录属性中提取并添加到缓存键中。一个conf元素应以“conf:”前缀开头,后跟配置名称。例如,指定“conf:a.b.c”将把“a.b.c”添加到键中,这样具有不同默认目录的配置就不会共享相同的客户端池。可以指定多个conf元素。 |
format-table.enabled | false | 布尔值 | 是否支持格式表,格式表对应于常规Hive表,允许进行读写操作。但是,在这些过程中,它不会连接到元存储;因此,新添加的分区不会反映在元存储中,需要作为单独的分区操作手动添加。 |
hadoop-conf-dir | (none) | 字符串 | core-site.xml、hdfs-site.xml、yarn-site.xml、mapred-site.xml的文件目录。目前仅支持本地文件系统路径。如果未配置,尝试从“HADOOP_CONF_DIR”或“HADOOP_HOME”系统环境加载。配置优先级:1. 从“hadoop-conf-dir”;2. 从HADOOP_CONF_DIR;3. 从HADOOP_HOME/conf;4. HADOOP_HOME/etc/hadoop。 |
hive-conf-dir | (none) | 字符串 | hive-site.xml的文件目录,用于创建HiveMetastoreClient和安全认证,如Kerberos、LDAP、Ranger等。如果未配置,尝试从“HIVE_CONF_DIR”环境变量加载。 |
location-in-properties | false | 布尔值 | 设置Hive表/数据库属性中的位置。如果在使用对象存储(如s3、oss)时不想通过Hive的文件系统访问位置,可以将此选项设置为true。 |
Jdbc目录选项(JdbcCatalogOptions)
Jdbc目录的选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
catalog-key | "jdbc" | 字符串 | 自定义jdbc目录存储键。 |
lock-key-max-length | 255 | 整数 | 设置锁键的最大长度。“lock-key”由“catalog-key”、“database”和“table”三个字段连接而成。 |
Flink目录选项(FlinkCatalogOptions)
Paimon的Flink目录选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
default-database | "default" | 字符串 | 默认数据库名称。 |
disable-create-table-in-default-db | false | 布尔值 | 如果为true,则不允许在默认数据库中创建表。默认为false。 |
Flink连接器选项(FlinkConnectorOptions)
Paimon的Flink连接器选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
end-input.watermark | (none) | 长整数 | 批处理模式或有界流情况下可选的endInput水印。 |
lookup.async | false | 布尔值 | 是否启用异步查找连接。 |
lookup.async-thread-number | 16 | 整数 | 查找异步的线程数。 |
lookup.bootstrap-parallelism | 4 | 整数 | 查找连接中单个任务引导的并行度。 |
lookup.cache | AUTO | 枚举值 | 查找连接的缓存模式。可能的值: "AUTO" "FULL" |
lookup.dynamic-partition | (none) | 字符串 | 查找的特定动态分区,目前仅支持'max_pt()'。 |
lookup.dynamic-partition.refresh-interval | 1 h | 持续时间 | 查找的特定动态分区刷新间隔,扫描所有分区并获取相应分区。 |
lookup.refresh.async | false | 布尔值 | 是否在异步线程中刷新查找表。 |
lookup.refresh.async.pending-snapshot-count | 5 | 整数 | 如果挂起的快照计数超过阈值,查找操作符将同步刷新表。 |
partition.end-input-to-done | false | 布尔值 | 当结束输入时,是否标记完成状态以指示数据已准备好。 |
partition.idle-time-to-done | (none) | 持续时间 | 设置一个分区在没有新数据后的时间间隔,经过此时间间隔后,标记完成状态以指示数据已准备好。 |
partition.time-interval | (none) | 持续时间 | 你可以为分区指定时间间隔,例如,每日分区为“1 d”,每小时分区为“1 h”。 |
scan.infer-parallelism | true | 布尔值 | 如果为false,源的并行度由全局并行度设置。否则,源并行度从分割数(批处理模式)或桶数(流模式)推断。 |
scan.infer-parallelism.max | 1024 | 整数 | 如果scan.infer-parallelism为true,通过此选项限制源的并行度。 |
scan.parallelism | (none) | 整数 | 为扫描源定义自定义并行度。默认情况下,如果未定义此选项,规划器将通过考虑全局配置为每个语句单独推导并行度。如果用户启用scan.infer-parallelism,规划器将通过推断的并行度推导并行度。 |
scan.push-down | true | 布尔值 | 如果为true,Flink将把投影、过滤器、限制下推到源。代价是在作业中难以重用源。在Flink 1.18或更高版本中,即使进行投影下推,也可以重用源。 |
scan.remove-normalize | false | 布尔值 | 流读取时是否强制删除规范化节点。注意:这很危险,如果下游用于计算聚合且输入不是完整的变更日志,很可能导致数据错误。 |
scan.split-enumerator.batch-size | 10 | 整数 | 在StaticFileStoreSplitEnumerator中,每批应分配给子任务多少个分割,以避免超过 |
scan.split-enumerator.mode | fair | 枚举值 | StaticFileStoreSplitEnumerator分配分割使用的模式。可能的值: "fair":批处理读取时均匀分配分割,以防止少数任务读取所有数据。 "preemptive":根据任务的消费速度抢占式分配分割。 |
scan.watermark.alignment.group | (none) | 字符串 | 用于对齐水印的一组源。 |
scan.watermark.alignment.max-drift | (none) | 持续时间 | 对齐水印的最大漂移,在暂停从源/任务/分区消费之前。 |
scan.watermark.alignment.update-interval | 1 s | 持续时间 | 任务应多久通知协调器当前水印,以及协调器应多久宣布最大对齐水印。 |
scan.watermark.emit.strategy | on-event | 枚举值 | 水印生成的发射策略。可能的值: "on-periodic":定期发射水印,间隔由Flink的'pipeline.auto-watermark-interval'控制。 "on-event":每条记录发射水印。 |
scan.watermark.idle-timeout | (none) | 持续时间 | 如果在该时间段内流的某个分区没有记录流动,则该分区被视为“空闲”,不会阻碍下游操作符中水印的进度。 |
sink.clustering.by-columns | (none) | 字符串 | 指定范围分区期间用于比较的列名,格式为'columnName1,columnName2'。如果未设置或设置为空字符串,则表示未启用范围分区功能。此选项仅对无主键且为批处理执行模式的无桶感知表有效。 |
sink.clustering.sample-factor | 100 | 整数 | 指定采样因子。设S为总样本数,F为采样因子,P为sink并行度,则S = F×P。允许的最小采样因子为20。 |
sink.clustering.sort-in-cluster | true | 布尔值 | 表示范围分区后是否对属于每个sink任务的数据进一步排序。 |
sink.clustering.strategy | "auto" | 字符串 | 指定范围分区使用的比较算法,包括'zorder'、'hilbert'和'order',分别对应z-order曲线算法、hilbert曲线算法和基本类型比较算法。未配置时,将根据'sink.clustering.by-columns'中的列数自动确定算法。1列时使用'order',小于5列时使用'zorder',5列或更多列时使用'hilbert'。 |
sink.committer-cpu | 1.0 | 双精度浮点数 | Sink提交器的CPU,用于控制全局提交器的CPU核心数。 |
sink.committer-memory | (none) | 内存大小 | Sink提交器的内存,用于控制全局提交器的堆内存。 |
sink.committer-operator-chaining | true | 布尔值 | 允许sink提交器和写入器操作符链接在一起。 |
sink.cross-partition.managed-memory | 256 mb | 内存大小 | 跨分区更新中RocksDB的托管内存权重,Flink将根据该权重计算内存大小,实际使用的内存取决于运行环境。 |
sink.managed.writer-buffer-memory | 256 mb | 内存大小 | 托管内存中写入器缓冲区的权重,Flink将根据该权重为写入器计算内存大小,实际使用的内存取决于运行环境。 |
sink.parallelism | (none) | 整数 | 为sink定义自定义并行度。默认情况下,如果未定义此选项,规划器将通过考虑全局配置为每个语句单独推导并行度。 |
sink.savepoint.auto-tag | false | 布尔值 | 如果为true,将为Flink保存点创建的快照自动创建一个标签。 |
sink.use-managed-memory-allocator | false | 布尔值 | 如果为true,Flink sink将为合并树使用托管内存;否则,它将创建一个独立的内存分配器。 |
source.checkpoint-align.enabled | false | 布尔值 | 是否将Flink检查点与Paimon表的快照对齐,如果为true,仅在消耗快照时才进行检查点。 |
source.checkpoint-align.timeout | 30 s | 持续时间 | 如果检查点开始触发时新快照尚未生成,枚举器将阻塞检查点并等待新快照。设置最大等待时间以避免无限等待,如果超时,检查点将失败。请注意,它应设置为小于检查点超时时间。 |
streaming-read.shuffle-bucket-with-partition | true | 布尔值 | 流读取时是否按分区和桶进行洗牌。 |
unaware-bucket.compaction.parallelism | (none) | 整数 | 为无桶感知表合并作业定义自定义并行度。默认情况下,如果未定义此选项,规划器将通过考虑全局配置为每个语句单独推导并行度。 |
Spark目录选项(SparkCatalogOptions)
Paimon的Spark目录选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
catalog.create-underlying-session-catalog | false | 布尔值 | 如果为true,在使用SparkGenericCatalog时,创建并使用底层会话目录而不是默认会话目录。 |
defaultDatabase | "default" | 字符串 | 默认数据库名称。 |
Spark连接器选项(SparkConnectorOptions)
Paimon的Spark连接器选项。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
read.changelog | false | 布尔值 | 是否以变更日志的形式读取行(在行中添加rowkind列以表示其更改类型)。 |
read.stream.maxBytesPerTrigger | (none) | 长整数 | 单个批次中返回的最大字节数。 |
read.stream.maxFilesPerTrigger | (none) | 整数 | 单个批次中返回的最大文件数。 |
read.stream.maxRowsPerTrigger | (none) | 长整数 | 单个批次中返回的最大行数。 |
read.stream.maxTriggerDelayMs | (none) | 长整数 | 两个相邻批次之间的最大延迟,与read.stream.minRowsPerTrigger一起用于创建MinRowsReadLimit。 |
read.stream.minRowsPerTrigger | (none) | 长整数 | 单个批次中返回的最小行数,与read.stream.maxTriggerDelayMs一起用于创建MinRowsReadLimit。 |
write.merge-schema | false | 布尔值 | 如果为true,在写入数据之前自动合并数据模式和表模式。 |
write.merge-schema.explicit-cast | false | 布尔值 | 如果为true,当两种数据类型满足显式转换规则时,允许合并数据类型。 |
ORC选项
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
orc.column.encoding.direct | (none) | 整数 | 在orc中要跳过字典编码的字段的逗号分隔列表。 |
orc.dictionary.key.threshold | 0.8 | 双精度浮点数 | 如果字典中不同键的数量大于非空行总数的此分数,则在orc中关闭字典编码。使用0始终禁用字典编码。使用1始终使用字典编码。 |
orc.write.batch-size | 1024 | 整数 | orc的写入批大小。 |
RocksDB选项
以下选项允许用户精细调整RocksDB以获得更好的性能。你可以在表属性或动态表提示中指定它们。
键 | 默认值 | 类型 | 描述 |
---|---|---|---|
lookup.cache-rows | 10000 | 长整数 | 缓存中存储的最大行数。 |
lookup.continuous.discovery-interval | (none) | 持续时间 | lookup持续读取的发现间隔。这用作SQL提示。如果未配置,lookup函数将回退到'continuous.discovery-interval'。 |
rocksdb.block.blocksize | 4 kb | 内存大小 | 每个块中包装的用户数据的近似大小(以字节为单位)。默认块大小为“4KB”。 |
rocksdb.block.cache-size | 128 mb | 内存大小 | RocksDB中数据块的缓存量。 |
rocksdb.block.metadata-blocksize | 4 kb | 内存大小 | 每个块中分区元数据的近似大小。当前在启用分区索引/过滤器选项时应用于索引块。默认块大小为“4KB”。 |
rocksdb.bloom-filter.bits-per-key | 10.0 | 双精度浮点数 | 布隆过滤器将使用的每个键的位数,仅在使用布隆过滤器时生效。默认值为10.0。 |
rocksdb.bloom-filter.block-based-mode | false | 布尔值 | 如果为true,RocksDB将使用基于块的过滤器而不是完整过滤器,仅在使用布隆过滤器时生效。默认值为“false”。 |
rocksdb.compaction.level.max-size-level-base | 256 mb | 内存大小 | 级别基础文件总大小的上限(以字节为单位)。默认值为“256MB”。 |
rocksdb.compaction.level.target-file-size-base | 64 mb | 内存大小 | 合并的目标文件大小,它决定了1级文件的大小。默认值为“64MB”。 |
rocksdb.compaction.level.use-dynamic-size | false | 布尔值 | 如果为true,RocksDB将动态选择每个级别的目标大小。从空数据库开始,RocksDB将使最后一级成为基础级别,这意味着将L0数据合并到最后一级,直到它超过max_bytes_for_level_base。然后对倒数第二级重复此过程,依此类推。默认值为“false”。有关更多信息,请参阅RocksDB文档。 |
rocksdb.compaction.style | LEVEL | 枚举值 | 数据库的指定合并样式。候选合并样式为LEVEL、FIFO、UNIVERSAL或NONE,Flink选择“LEVEL”作为默认样式。可能的值: "LEVEL" "UNIVERSAL" "FIFO" "NONE" |
rocksdb.compression.type | LZ4_COMPRESSION | 枚举值 | 合并类型。可能的值: "NO_COMPRESSION" "SNAPPY_COMPRESSION" "ZLIB_COMPRESSION" "BZLIB2_COMPRESSION" "LZ4_COMPRESSION" "LZ4HC_COMPRESSION" "XPRESS_COMPRESSION" "ZSTD_COMPRESSION" "DISABLE_COMPRESSION_OPTION" |
rocksdb.files.open | -1 | 整数 | 数据库可使用的打开文件的最大数量(每个有状态操作符),“-1”表示无限制。默认值为“-1”。 |
rocksdb.thread.num | 2 | 整数 | 并发后台刷新和合并作业的最大数量(每个有状态操作符)。默认值为“2”。 |
rocksdb.use-bloom-filter | false | 布尔值 | 如果为true,每个新创建的SST文件将包含一个布隆过滤器。默认情况下禁用。 |
rocksdb.writebuffer.count | 2 | 整数 | 在内存中建立的写缓冲区的最大数量。默认值为“2”。 |
rocksdb.writebuffer.number-to-merge | 1 | 整数 | 在写入存储之前将合并在一起的最小写缓冲区数量。默认值为“1”。 |
rocksdb.writebuffer.size | 64 mb | 内存大小 | 在转换为排序的磁盘文件之前,在内存中累积的数据量(由磁盘上的未排序列表支持)。默认写缓冲区大小为“64MB”。 |
拓展:
CoreOptions相关拓展: async-file-write:异步文件写入在大数据量写入场景下能显著提升性能。当开启异步写入时,数据可以在后台线程进行写入操作,而主线程可以继续处理其他任务,避免了写入操作成为整个数据处理流程的瓶颈。例如在高并发的日志数据写入场景中,异步写入能够让系统在不阻塞其他数据处理逻辑的前提下,高效地将日志数据持久化到存储中。
changelog-producer:不同的变更日志生成策略适用于不同的业务需求。例如,“lookup”策略适合对数据一致性和实时性要求较高的场景,因为它在提交数据写入之前就生成变更日志,能保证数据更改的详细记录与写入操作紧密关联。在金融交易数据处理中,每一笔交易的变更都需要准确记录,“lookup”策略就可以确保变更日志的及时性和准确性,便于后续的审计和分析。
scan.mode:不同的扫描模式为数据读取提供了极大的灵活性。“from-timestamp”模式适用于需要从特定时间点开始获取数据变更的场景,如在监控系统中,可能需要从某个故障发生时间点开始读取相关数据的变更,以分析故障原因。“compacted-full”模式则在需要获取经过合并后的最新数据状态时非常有用,比如在数据仓库场景中,定期的全量合并可以清理过期数据和合并小文件,“compacted-full”模式能确保读取到合并后的最新、最优化的数据快照。
CatalogOptions相关拓展:
lock.type:不同的锁类型适用于不同的部署环境和数据一致性要求。例如,“hive”锁类型在与Hive集成的环境中能很好地利用Hive已有的锁机制来保证数据的一致性,适用于需要与Hive生态紧密结合的数据管理场景。而“zookeeper”锁类型则利用Zookeeper的分布式协调能力,提供了更强大的分布式锁功能,适用于大规模分布式数据存储和处理环境,确保在多节点并发访问时的数据一致性和完整性。
metastore:选择不同的元存储方式会影响数据的管理和访问方式。“filesystem”元存储简单直接,适用于小规模、对部署复杂度要求较低的场景,它将元数据存储在文件系统中,易于理解和维护。“hive”元存储则可以与Hive的生态系统无缝集成,方便在Hive环境中对Paimon表进行管理和查询,例如可以利用Hive的SQL接口对Paimon表进行操作,同时共享Hive的元数据管理功能。“jdbc”元存储则提供了更灵活的元数据存储方式,可以将元数据存储在关系型数据库中,便于进行复杂的元数据查询和管理,适用于对元数据管理有较高要求的企业级应用。
FlinkConnectorOptions相关拓展:
lookup.async:启用异步查找连接在大数据量关联查询场景下能有效提升性能。在实时数据分析中,常常需要将实时流数据与一个较大的维度表进行关联,若采用同步查找连接,可能会因为等待维度表的查询结果而导致流处理延迟。而异步查找连接允许在等待维度表查询结果的同时,继续处理其他流数据,当查询结果返回后再进行相应的处理,从而提高了整个流处理系统的吞吐量和响应速度。
sink.clustering.strategy:不同的范围分区策略适用于不同的数据分布和查询需求。“zorder”策略在处理多维数据且需要快速定位特定区域数据的场景中表现出色,例如在地理信息系统(GIS)数据处理中,通过Z-order曲线可以将二维或多维的地理空间数据进行有效的分区和排序,使得查询特定区域的数据变得更加高效。“hilbert”策略则在处理高维数据时能更好地保持数据的局部性,适用于对数据局部性要求较高的数据分析场景。“order”策略简单直接,适用于单维度数据的分区和排序,在数据分布较为均匀的单维度数据处理中能发挥较好的性能。
SparkConnectorOptions相关拓展:
read.changelog:以变更日志形式读取数据在数据版本管理和数据审计场景中非常有用。例如在数据仓库的缓慢变化维度(SCD)处理中,通过读取变更日志可以准确记录每一次数据的变化,包括插入、更新和删除操作,便于跟踪数据的演变历史。在数据质量监控中,变更日志可以帮助分析数据异常变化的原因,通过对比不同时间点的变更日志,可以发现数据质量问题的根源。
write.merge-schema:自动合并数据模式和表模式在数据集成场景中能大大简化数据写入操作。当从多个数据源获取数据并写入Paimon表时,不同数据源的数据模式可能存在差异。启用此选项后,系统可以自动将数据模式与目标表模式进行合并,减少了手动进行模式转换和验证的工作量,提高了数据集成的效率和准确性。同时,结合“write.merge-schema.explicit-cast”选项,可以在满足显式转换规则的情况下,更加灵活地处理不同数据类型之间的合并,避免了因数据类型不匹配而导致的数据写入失败问题。
RocksDB Options相关拓展:
rocksdb.compaction.style:不同的合并样式对RocksDB的性能和存储布局有显著影响。“LEVEL”合并样式是一种常用的策略,它通过将数据分层存储,使得不同层次的数据具有不同的合并比和访问频率,适用于大多数通用场景,能够在性能和存储利用率之间取得较好的平衡。“UNIVERSAL”合并样式则更注重减少存储文件的数量,适用于存储资源有限的场景,但可能会在合并过程中消耗更多的CPU资源。“FIFO”合并样式按照先进先出的原则进行合并,适用于对数据新鲜度要求较高的场景,确保较新的数据能够及时得到合并和整理。“NONE”合并样式则完全禁用合并,适用于对写入性能要求极高且存储资源充足的场景,因为它避免了合并过程带来的开销,但会占用更多的存储空间。
rocksdb.use-bloom-filter:布隆过滤器在RocksDB中用于快速判断数据是否存在于数据库中。当启用布隆过滤器时,它可以显著减少磁盘I/O操作,提高查询性能。在大规模数据存储中,数据量可能非常庞大,直接在磁盘上查询数据是否存在会带来较高的I/O开销。布隆过滤器通过在内存中维护一个紧凑的位图结构,能够快速判断某个键是否可能存在于数据库中。虽然布隆过滤器存在一定的误判率,但在大多数情况下,它可以有效地过滤掉大量不存在的数据查询请求,从而减少不必要的磁盘I/O,提高系统的整体性能。同时,通过调整“rocksdb.bloom-filter.bits-per-key”和“rocksdb.bloom-filter.block-based-mode”等参数,可以进一步优化布隆过滤器的性能和误判率,以适应不同的应用场景需求。