随着越来越多的用户使用 DolphinDB,各种不同的应用的场景也对 DolphinDB 的数据接入提出了不同的要求。部分用户需要将 PostgreSQL 的数据实时同步到 DolphinDB 中来,以满足在 DolphinDB 中使用数据的实时性需求。本篇教程将介绍使用 Debezium 来实时捕获和发布 PostgreSQL 的数据库更改事件,并完成 PostgreSQL 到 DolphinDB 的实时数据同步的完整的解决方案。
1. Debezium 同步方案概述
Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。本教程中将采用 Debezium 与 Kafka 组合的方式来实现从 PosgreSQL12 到 DolphinDB 的数据同步。
Kafka +Debezium 的数据同步方案需要部署 4 个服务,如下所示:
- ZooKeeper:kafka 的依赖部署。
- Kafka:数据存储。
- Kafka-Connect:用于接入数据插件 source-connetor, sink-connector 的框架,可以提供高可用,也可以部署单实例版本。
- Schema-Registry :提供实时同步的数据的元数据注册功能 ,支持数据序列化。
基于 Debezium 同步 PostgreSQL 数据到 DolphinDB 的架构图如下:
图 1-1 同步架构图
接下来,本教程将逐一介绍这些服务的下载、安装,以及配置数据同步任务。
2. 部署 Kafka 单实例实时数据存储
Kafka 单实例实时数据存储部署方案在 MySQL 和 Oracle 数据同步教程中均有有介绍,可以查看:Debezium+Kafka 实时同步 Oracle 11g 数据到 DolphinDB 教程,部署好 ZooKeeper,Kafka,Kafka-Connect , Schema-Registry 的数据同步、存储运行环境。
3. 从 PostgreSQL 到 Kafka 的数据同步:部署程序与配置任务
在部署好 Kafka 和 Kafka Connect 环境后,即可配置 PostgreSQL 的实时数据同步任务。
注意:PostgreSQL 的数据实时同步仅支持 UTF-8 字符集。
本节涉及的操作,若未作特殊说明,均使用操作系统用户 kafka
执行。kafka 用户的权限等配置,详见章节 2 部署 Kafka 单实例实时数据存储 部分。
3.1 配置 PostgreSQL 数据库
对于 Source 数据库是 PostgresSQL,本教程采用 PostgreSQL 自带的逻辑复制流插件 pgoutput 插件来捕获实时数据变化。pgoutput 插件是 PostgreSQL 在版本 PostgreSQL 10+ 推出并内置的插件,无须额外配置。所以本教程介绍的 PostgresSQL 到 DolphinDB 的实时数据同步方案 需要 PostgreSQL 10+ 以上版本。
本教程中使用的数据库是配置于 CentOS 7 下的 PostgresSQL 12 版本。
第一步: 配置逻辑解码和数据库访问权限
PostgreSQL 的实时变更数据捕获是通过配置 replication slots ,开启逻辑复制流实现。配置以下参数,可以管理当前数据库的逻辑解码进程。
通过 vim 命令打开 /var/lib/pgsql/12/data/postgresql.conf 文件,添加或者调整以下参数。wal_level 参数必须设置。
# REPLICATION
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4 # max number of walsender processes (change requires restart)
#wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s # in milliseconds; 0 disables
max_replication_slots = 4 # max number of replication slots (change requires restart)
PostgreSQL 中的数据库访问权限需要在 /var/lib/pgsql/12/data/pg_hba.conf 中进行配置。可以如下配置 IP4 的所有主机的 md5 加密访问权限。
host all all 0.0.0.0/0 md5
修改配置文件需要重启数据库生效。
第二步: 创建数据库和用户
在使用数据库时,通常会为业务数据创建专门的用户和数据库进行数据管理。这里我们创建业务数据的存储数据库和业务用户,再创建一个专门进行逻辑复制的用户。
使用超级用户 postgres ,创建业务数据用户和数据库,并授予权限。
CREATE USER factoruser WITH PASSWORD '111111';
CREATE DATABASE factordb;
GRANT ALL PRIVILEGES ON DATABASE factordb TO factoruser ;
创建逻辑复制用户,并授予该用户逻辑复制权限和连接业务数据库权限。
CREATE USER datasyn WITH PASSWORD '111111';
ALTER USER datasyn WITH REPLICATION;
GRANT CONNECT ON DATABASE factordb TO datasyn;
接下来我们创建业务数据库的 schema 并授予逻辑复制用户对业务数据库的数据访问权限,这需要登录数据库,并切换到业务数据库(database)后执行。切换 database,使用 psql 连接可以按 图 3-1 操作。
图 3-1 PostgreSQL 切换 database 操作
创建业务数据 schema,需要当前数据库是 factordb。
create schema factorsch authorization factoruser;
授予 datasyn 用户对 业务数据库 factordb 的 factorsch 的具体使用权限。
GRANT USAGE ON SCHEMA factorsch TO datasyn;
GRANT SELECT ON ALL TABLES IN SCHEMA factorsch TO datasyn;
配置逻辑复制用户的逻辑复制访问权限,修改文件 /var/lib/pgsql/12/data/pg_hba.conf。
host replication datasyn 0.0.0.0/0 md5
重启数据库,并测试逻辑解码插件。
--创建复制槽
SELECT * FROM pg_create_logical_replication_slot('pgoutput_demo', 'pgoutput');
--查看相应复制槽是否存在
SELECT * FROM pg_replication_slots;
--删除复制槽
SELECT * FROM pg_drop_replication_slot('pgoutput_demo');
3.2 安装 Debezium-PostgreSQL 连接器插件
PostgreSQL 的 Debezium 数据同步插件需要安装在 Kafka Connect 程序部署文件路径下。
配置启动 Debezium-PostgreSQL 连接器,需要以下两步:
-
下载 Debezium-PostgreSQL-Connector 插件,将插件解压并放到 Kafka Connect 的插件路径下
-
重新启动 Kafka Connect 程序,以加载插件
第一步: 下载安装 Debezium-PostgreSQL 插件
前往官方网站 Debezium,选择 2.5.1.Final 版本进行下载,程序名为 debezium-connector-postgres-2.5.1.Final-plugin.tar.gz,本教程测试时采用的此版本,也可以使用 2.5 系列的最新版本。如果想使用 Debezium 的最新稳定版本,需要仔细阅读对应的版本需求,并进行详细测试。
图 3-2 Debezium 官网下载链接
在 confluent 的安装路径下创建插件路径,在此路径下解压 Debezium 的 PostgreSQL 插件包,请确保 kafka 用户对此路径具有读权限,如果下面代码中的下载链接失效,请按 图 3-2 官网示例位置下载。
sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.1.Final/debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo tar -xvf ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo rm ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
第二步: 配置 Kafka Connect 加载插件
修改 Kafka Connect 的配置文件,添加插件路径配置。若已配置插件路径,则跳过该步骤。
cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties
添加或修改参数 plugin.path 如下。
plugin.path=/opt/confluent/share/java/plugin
重新启动 Kafka Connect。
sudo systemctl restart kafka-connect
查看 kafka connect 的日志输出,能查询到信息则说明插件加载成功。
cat /KFDATA/kafka-connect/logs/connect.log | grep PostgresConnector
图 3-3 Debezium-PostgreSQL 插件加载成功信息
3.3 配置 PostgreSQL 数据同步连接任务
数据库基础配置和 PostgreSQL 同步插件安装好,我们就可以开始配置 PostgreSQL 的数据同步任务。配置同步任务及检查的很多命令都要带上 url 等参数。为了操作快捷,本教程封装了一些加载配置文件的操作脚本在 kafka-tools.tar 包中,详情参见附录。下载当前包,解压缩到 /KFDATA 目录下。后续的很多操作,包括检查 Kafka 的 topic、查看数据和配置同步任务等都会使用 kafka-tools.tar 包中的脚本。包中的脚本在无参数运行时会输出 help 文档。
cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar
修改 /KFDATA/kafka-tools/config/config.properties 配置参数。
按照本机的路径、IP 等对应修改 Kafka 和 Kafka Connect 的启动 IP 地址,以及安装目录。
示例如下:
#kafka parameters
kafka_home=/opt/kafka
confluent_home=/opt/confluent
bootstrap_server=192.168.189.130:9092#kafka-connect parameters
connect_rest_url=192.168.1.178:8083
#rest_pd means restful request password,This is not necessary
#rest_pd=appsdba:passwd
schema_ip=192.168.189.130
schema_port=8081
第一步:准备 PostgreSQL 数据库表
本教程的同步方案支持 DolphinDB 的 TSDB 和 OLAP 两种存储引擎的数据同步。其中 TSDB 引擎对于单字段主键和多字段复合主键有不同的处理方式。所以这里我们创建三张表来展示不同的情况的配置操作。
-
单主键同步到 TSDB 引擎 : factorsch.stock_example。
-
复合主键同步到 TSDB 引擎 : factorsch.index_example_tsdb。
-
复合主键同步到 OLAP 引擎 : factorsch.index_example_olap。
创建业务数据库表、数据,均使用 factoruser ,登录 factordb。
创建表 factorsch.stock_example。
create table factorsch.stock_example (id bigint,ts_code varchar(20),symbol_id varchar(20),name varchar(20),area varchar(20),industry varchar(20),list_date date,primary key (id)
);
插入数据。
insert into factorsch.stock_example(id,ts_code,symbol_id,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安银行','深圳','银行','1991-04-03'),
(2,'000002.SZ','000002','万科A','深圳','地产','1991-01-29'),
(3,'000004.SZ','000004','ST国华','深圳','软件服务','1991-01-14');
创建表 factorsch.index_example_tsdb。
create table factorsch.index_example_tsdb (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);
插入数据。
insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000905', '中百集团', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000906', '中百集团', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上证180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
创建表 factorsch.index_example_olap。
create table factorsch.index_example_olap (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);
插入数据。
insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000905', '中百集团', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000906', '中百集团', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上证180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
第二步:配置订阅发布
使用 PostgreSQL 的 pgoutput 插件进行逻辑复制,需要配置 publication 来定义哪些表的数据变更需要被逻辑复制。配置 publication 在 PostgreSQL 的不同版本支持的脚本写法不尽相同,本教程以 PostgreSQL 12 版本为例介绍通用的两种配置方式。
方式一:普通用户配置(推荐)
factoruser 是普通的业务数据用户,不具备超级用户权限。配置发布时需要逐一配置每张表,也可以对单一指定表进行取消配置,当有新增表时需要进行添加。这里我们将上文中创建的三张表全部配置。
创建一个发布,先发布两张表 factorsch.index_example_tsdb 和 factorsch.stock_example。
CREATE PUBLICATION factordb_publication FOR TABLE factorsch.index_example_tsdb,factorsch.stock_example
查看发布和发布的具体表。
select * from pg_publication
select * from pg_publication_tables
图 3-4 PostgreSQL 中发布列表查询
图 3-5 PostgreSQL 中的发布表明细查询
对指定 publication ,添加一张表 factorsch.index_example_olap。
ALTER PUBLICATION factordb_publication ADD TABLE factorsch.index_example_olap;
图 3-6 查看增加的发布表
对指定 publication ,删除一张表。
ALTER PUBLICATION factordb_publication DROP TABLE factorsch.index_example_olap;
删除发布。.
drop publication factordb_publication
方式二:超级用户配置
需要先提升 factoruser 权限为超级用户,可以使用超级用户 postgres 操作。
ALTER USER factoruser WITH SUPERUSER;
使用超级用户 factoruser 创建全表发布。
CREATE PUBLICATION all_tables_publication FOR ALL TABLES;
如 图 3-7 所示,所有的表都会被发布,增加新表也会直接变成发布表。
图 3-7 全表订阅发布表列表查看
对于以上两种方式,推荐使用方式一,可以精确进行逻辑复制,控制资源使用,控制用户权限。只有在确实需要整库同步,且表的数据量众多的场景可以使用方式二。
第三步:准备连接器配置文件,并启动连接任务
创建连接 PostgreSQL 的 source 连接器配置文件。
mkdir -p /KFDATA/datasyn-config
vim /KFDATA/datasyn-config/source-postgres.json
录入以下配置,hostname 和 kafka 启动地址需对应修改。注意这里的 publication.name 参数要和配置发布名一致,且要提前配置好。
{"name": "postgresTask","config": {"connector.class" : "io.debezium.connector.postgresql.PostgresConnector","snapshot.mode" : "initial","tasks.max" : "1","topic.prefix" : "pg_factorDB","database.hostname" : "192.168.189.130","database.port" : 5432,"database.user" : "datasyn","database.password" : "111111","database.dbname" : "factordb","table.include.list" : "factorsch.index_example_tsdb,factorsch.index_example_olap,factorsch.stock_example","decimal.handling.mode": "string","plugin.name": "pgoutput","publication.name": "factordb_publication","slot.name": "datasyn_slot","heartbeat.interval.ms":"20000"}
}
重要参数说明:
表 3-1 source 连接器重要参数说明
更多详细参数说明可以参看 Debezium 2.5,不同Debezium版本的参数配置不同,若使用其他版本的Debezium,需找到对应文档做修改。
第三步: 启动 PostgreSQL 的数据同步任务
通过 REST API 启动 PostgreSQL 的 source 连接器。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-postgres.json
也可以通过 kafka-tools 中的脚本启动。
cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/source-postgres.json
图 3-8 source 连接器启动成功信息
第四步:查看 PostgreSQL 数据同步任务状态
查看同步任务列表。list 参数展示任务名列表,showall 参数会显示全部同步任务状态。
cd /KFDATA/kafka-tools/bin
./rest.sh showall
当前只有一个同步任务,如下图所示。
图 3-9 查看全部同步任务状态信息
查看 kafka 中的 topic 列表:
./kafka.sh tplist
图 3-10 当前 kafka 中的 topic 列表
查看 表 stock_example、index_example_tsdb、index_example_olap 目前进入到 kafka 中的数据条数:
./kafka.sh get_offsets pg_factorDB.factorsch.stock_example
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_tsdb
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_olap
结果如图所示,当前配置的三张数据数据表的历史数据都已经写入了 kafka 中的对应 topic 中。
图 3-11 对应 topic 中数据条数信息
4. 从Kafka 到 DolphinDB 的数据同步:部署程序与配置任务
4.1 安装 Kafka-DolphinDB 数据同步连接器插件
配置启动 Kafka-DolphinDB 连接器,需要以下两步:
-
下载 Kafka-DolphinDB-Connector 插件,将插件解压并放到 Kafka Connect 的插件路径下。
-
重新启动 Kafka Connect 程序,以加载插件。
第一步:下载 Kafka-DolphinDB 插件
-
jdbc-1.30.22.5-CDC.jar :该包为 DolphinDB JDBC 包为数据同步做了一些专门修改,为特殊版本。
-
kafka-connect-jdbc-4.00.jar:是基于kafka-connect-jdbc-10.7.4 开发的 DolphinDB 连接器,后续会进行代码开源。
创建插件路径,在此路径下放置 Kafka-DolphinDB 插件包,将上述两个 jar 包放在此目录下。请确保 kafka 用户包含对这两个文件的读权限。(文件包见附件)
sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc
sudo cp ~/jdbc-1.30.22.5-CDC.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
sudo cp ~/kafka-connect-jdbc-10.7.4-ddb1.04.OLAP.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
如果上面的操作碰到权限问题,则可以使用以下命令赋予权限。
sudo chmod o+rx /opt/confluent/share/java/plugin/kafka-connect-jdbc/*
第二步: 重启 kafka-connect
sudo systemctl restart kafka-connect
查看 kafka-connect 路径的日志输出
cat /KFDATA/kafka-connect/logs/connect.log | grep JdbcSinkConnector
如下图所示,则插件加载成功。
图 4 -1 Kafka-DolphinDB 插件加载成功信息
4.2 配置 DolphinDB 的数据同步连接任务
第一步:创建同步的 DolphinDB 库、表
根据 PostgreSQL 表结构,创建与 PostgreSQL 表结构一致的表,PostgreSQL 数据类型转换为 DolphinDB 数据类型对照表可以参考5.2节。
创建单主键表 stock_example 的 DolphinDB 对应表:
//创建 dfs://stock_data 数据库
if (existsDatabase("dfs://stock_data"))dropDatabase("dfs://stock_data")
dbName = "dfs://stock_data"
db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 3], engine="TSDB", atomic="CHUNK")
//创建 stock_example 表
tbName = "stock_example"
colNames = `id`ts_code`symbol_id`name`area`industry`list_date`dummySortKey__
colTypes = `LONG`SYMBOL`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DATE`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`id, sortColumns=`id`dummySortKey__, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,100}], softDelete=true)
创建复合主键表 index_example_olap 的 DolphinDB 对应表:
//创建 dfs://index_data_olap 数据库
if (existsDatabase("dfs://index_data_olap"))dropDatabase("dfs://index_data_olap")
dbName = "dfs://index_data_olap"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, atomic="CHUNK")
//创建 olap 引擎数据库下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date)
创建复合主键表 index_example_tsdb 的 DolphinDB 对应表:
//创建 dfs://index_data_tsdb 数据库
if (existsDatabase("dfs://index_data_tsdb"))dropDatabase("dfs://index_data_tsdb")
dbName = "dfs://index_data_tsdb"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, engine="TSDB", atomic="CHUNK")
//创建 tsdb 引擎数据库下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date, sortColumns=`stock_code`indexCode`flag`trade_date, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,10},hashBucket{,10},hashBucket{,1}], softDelete=true)
注:建表时的软删除功能,即 softDelete 选项需要 DolphinDB 2.00.11 及以上的版本。旧版本 DolphinDB 建表时可以去除该选项。
第二步: 配置同步配置表
在DolphinDB 中创建一张配置表,记录 kafka topic 和 DolphinDB 库表之间的映射关系。配置表的库表名可以自行调整,并在 DolphinDB 的同步任务中设置相应的库表名称。配置表中字段名是固定的,需和示例保持一致。
数据库名:dfs://ddb_sync_config
表名:sync_config
db = database("dfs://ddb_sync_config", HASH, [SYMBOL, 2])
t = table(1:0, `connector_name`topic_name`target_db`target_tab`add_sortcol_flag`primary_key,
[SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL])
db.createTable(t, "sync_config")
kafka topic 名可以通过之前介绍的 ./kafka.sh tplist 的命令查看,当前配置的三张表对应的 topic 如下:
-
单主键同步到 TSDB 引擎 : stock_example -> pg_factorDB.factorsch.stock_example。
-
复合主键同步到 TSDB 引擎 : index_example_tsdb->pg_factorDB.factorsch.index_example_tsdb
-
复合主键同步到 OLAP 引擎 : index_example_olap ->pg_factorDB.factorsch.index_example_olap。
插入配置信息表,将 kafka 中的 topic 和 DolphinDB 库表名称一一对应。
def addSyncConfig(connector_name, topic_name, dbname, tbname, add_sortcol_flag="0",primary_key=NULL) {loadTable("dfs://ddb_sync_config", "sync_config").append!(table([connector_name] as col1, [topic_name] as col2, [dbname] as col3,[tbname] as col4, [add_sortcol_flag] as col5,[primary_key] as col6))
}addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.stock_example", "dfs://stock_data", "stock_example", "1", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_tsdb", "dfs://index_data_tsdb", "index_example", "0", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_olap", "dfs://index_data_olap", "index_example", "0", "stock_code,indexCode,flag,trade_date")
以下是配置表的各个字段说明:
表 4-1 同步配置表字段说明
第三步: 准备连接器配置文件,并启动连接任务
创建 DolphinDB 数据同步任务配置文件。
cd /KFDATA/datasyn-config
vim ddb-sink-postgres.json
配置如下
{"name": "ddb-sink-postgres","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "pg_factorDB.factorsch.stock_example,pg_factorDB.factorsch.index_example_tsdb,pg_factorDB.factorsch.index_example_olap","connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.evolve": "false","insert.mode": "upsert","delete.enabled": "true","batch.size":"10000","pk.mode": "record_key","ddbsync.config.table":"dfs://ddb_sync_config,sync_config","ddbsync.addSortColFlag":"true","ddbsync.config.engineTypes" :"TSDB,OLAP"}
}-
表 4-2 sink 连接器重要参数说明
参数说明:以上参数项为同步 DolphinDB 所需参数。如果对 Confluent 的JDBC Sink Connect 有经验可适当调节。
通过 REST API 启动source连接器:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @ddb-sink-postgres.json
也可以通过 kafka-tools 中的脚本启动:
cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/ddb-sink-postgres.json
查看同步任务状态, ddb-sink-postgres 是 DolphinDB 的数据同步任务,可以看到现在我们有两个同步任务,这样构成了从 PostgreSQL 到 DolphinDB 的数据同步链。
./rest.sh showall
图 4-2 同步任务状态信息
第四步: 查看表初始数据同步进度
在设置 PostgreSQL 同步任务时,将 snapshot.mode 选项值设置为 ”initial” ,该选项意味着 PostgreSQL 会同步表的初始数据到 Kafka 中,设置完下游的 DolphinDB 任务后,可以检查初始数据的同步情况。
通过 kafka.sh 脚本查看消费者列表:
图 4-3 Kafka 消费者列表信息
查看 DolphinDB 同步任务对应的 Kafka 消费组中的每一个 consumer 的消费进度,通过此命令可以查看同步程序中每一张的表同步进度。 Lag 为 0 则表示 Kafka 中 topic 当前没有未消费的数据,即 Kafka 中的数据与对应表的数据是一致的。
./kafka.sh cm_detail connect-ddb-sink-postgres
图 4-4 connect-ddb-sink 中每张表同步进度
如 图 4-4 显示,数据已被 DolphinDB 同步任务消费完毕,此时在 DolphinDB 的 web 界面查看表中数据,表数据和 PostgreSQL 表中数据是一致的。
图 4-5 DolphinDB中 index_example 表数据
图 4-6 DolphinDB 中 stock_example表数据
4.3 实时同步验证
第一步:插入数据
向 PostgreSQL 中 factorsch.index_example_tsdb 表中插入两条新数据:
insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600054',
to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH:MI:SS'), '上证180', '000010', '三一重工',
'XXXB', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600055
',
to_date('2018-06-30 06:48:02', 'YYYY-MM-DD HH:MI:SS'), '沪深300', '000300', '三一重工',
'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);
查看 DolphinDB 对应的表数据:
select * from loadTable("dfs://index_data_tsdb","index_example")
可以看到新数据已写入:
图4-7 数据写入成功
第二步:更新数据
更新 PostgreSQL 中 factorsch.index_example_olap 表的 tmp_stamp 和 secshortname 的值。
update factorsch.index_example_olap
set tm_stamp = to_date('2025-02-28 16:00:00', 'YYYY-MM-DD HH24:MI:SS'),
secshortname ='测试修改名称'
where stock_code='000759'
查看 DolphinDB 中数据,数据已被修改:
select * from loadTable("dfs://index_data_olap","index_example")
图4-8 数据更新成功
第三步:删除数据
在 PostgreSQL 中的 factorsch.stock_example 删除一条数据:
delete from factorsch.stock_example where ts_code='000004.SZ'
再查看 DolphinDB 中数据,数据已被删除:
select * from loadTable("dfs://index_data_tsdb","index_example")
图4-9 数据删除成功
5. 部署注意事项
5.1 实时同步须知
DolphinDB 是一款支持海量数据的分布式时序数据库。针对不同的数据处理需求,在底层架构上天然上与通常的关系型数据库不同,所以需要有以下限制:
-
DolphinDB 的表没有主键设计,若使用 TSDB 引擎,需将主键设置为 sortColumn 字段,并设置 keepDuplicates=LAST 来进行去重,以确保数据唯一性。TSDB 引擎的 sortColumn 是分区内去重,如果使用的是分区表,需要至少将其中一个主键列设置为分区列。若使用 OLAP 引擎,需在同步配置表中设置目标库的主键。
-
PostgreSQL 表的主键可能不满足 TSDB 引擎的 sortColumn 设置规则,有以下三种情况:
1. PostgreSQL 表中有两个及以上的主键,其中一个主键为整数类型或时间类型,但末尾列不是整数类型或时间类型:
-
该情况需要调整 sortColumn 设置的顺序,将整数类型或时间类型的主键移动到末尾。
2. PostgreSQL 表中只有一个主键,或者 PostgreSQL 表中的主键的数据类型均不包含整数类型或时间类型:
-
该情况需要建表时在末尾补充一个 dummySortKey__ 列,值均设置为0,对应同步程序的配置表中需要将 add_sortcol_flag 列的值设置为“1”,并将 DolphinDB 同步连接任务中“ddbsync.addSortColFlag” 设置为 “true”。若使用 DataX 等工具进行初始全表同步,则需要做数据转换,提供该字段对应初始值。
3. PostgreSQL 表中的主键类型包含 DolphinDB 不支持的类型。
-
DolphinDB TSDB 引擎的 sortColumns 支持整数、日期或时间、字符串类型,暂时不支持小数类型,但在后续的版本里可能提供支持,请关注版本更新。
DDL 语句相关:
-
当前不支持 DDL 语句同步。
-
若表结构发生更改,需进行单表修复,具体操作后续会在实时同步的运维手册文档中给出。
其他:
-
表字段命名时,请尽量规避一些简单的名字,比如 code, timestamp 等,这种命名与 DolphinDB 内关键字重复,可能会导致无法正确同步。
5.2 PostgreSQL-DolphinDB 数据类型对应表
以下的类型对应表为推荐设置的 DolphinDB 类型,注意两者数据类型表示的精度范围,确保 DolphinDB 数据类型的精度可以覆盖原 PostgreSQL 类型。
表 5-1 PostgreSQL-DolphinDB 数据类型对应表
在浮点数数据处理上,PostgreSQL 的 DECIMAL 类型是精确值,DolphinDB 的 DOUBLE 类型的精度为15-16位有效数字,如果转换成 DolphinDB 的 DOUBLE 类型,会存在浮点数精度丢失问题。因此推荐用户转换成 DolphinDB 的 DECIMAL 类型,确保浮点数精度。
在时间类型转换上,请参照表中的类型映射,以保证 DolphinDB 中的时间类型字段在精度上可以覆盖 PostgreSQL 中时间类型字段的精度。对于带有时区信息的时间类型,DolphinDB 统一以 UTC 时区存储。
6. 同步性能测试
6.1 性能测试配置
建表语句
PostgreSQL 建表,并生成测试数据代码:
DROP TABLE if EXISTS factorsch.performance_test1;
CREATE TABLE factorsch.performance_test1 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成100w行数据,每天1000行
INSERT INTO factorsch.performance_test1 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 1000), ((gs-1)%1000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 1000000) gs;ALTER TABLE factorsch.performance_test1
ADD CONSTRAINT pk_performance_test1 PRIMARY KEY (id, dt);DROP TABLE if EXISTS factorsch.performance_test2;
CREATE TABLE debezium.performance_test2 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成1亿行数据,每天100000行
INSERT INTO factorsch.performance_test2 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 100000), ((gs-1)%100000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 100000000) gs;ALTER TABLE factorsch.performance_test2
ADD CONSTRAINT pk_performance_test2 PRIMARY KEY (id, dt);grant select on factorsch.performance_test1 to datasyn;
grant select on factorsch.performance_test2 to datasyn;
DolphinDB 建表代码:
dbName = "dfs://performance_test1"
tbName = "performance_test1"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
db = database(dbName, HASH, [SYMBOL, 2], , 'TSDB', 'CHUNK')
db.createTable(t, tbName, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)dbName = "dfs://performance_test2"
tbName = "performance_test2"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
partitionCols = `dt`id
db1 = database(, RANGE, date(datetimeAdd(1990.01M, 0..100*12, 'M')))
db2 = database(, HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [db1, db2], , `TSDB, `CHUNK)
db.createPartitionedTable(t, tbName, partitionColumns=partitionCols, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)
6.2 性能测试结果
性能测试结果如下表所示,其中总耗时等于 DolphinDB 更新完成时间减去 PostgreSQL 更新完成时间,因此总耗时包含了以下数据同步的完整链路:
-
Debezium 挖掘 PostgreSQL 日志到 Kafka
-
Kafka 推送数据给相应 topic 的消费者
-
下游的 DolphinDB Connector 消费 Kafka 中数据,解析为相应的 DolphinDB 更新语句,并执行写入 DolphinDB 完成
Kafka 每次推送的变更数据在 3000-4000 条,具体条数和 Kafka 的日志大小配置相关。对于 insert 和 update 类型的操作,DolphinDB 的处理效率很高。对于 delete 类型操作,由于 delete 操作涉及数据查找, DolphinDB 的处理效率和具体表的数据行数、分区方式相关。
表 6-1 使用 pgoutput 插件进行数据同步性能测试结果
7. 常见问题解答(FAQ)
7.1 创建同步任务时报错
(1)json 文件格式错误
图 7-1 json文件格式错误报错信息
造成上述问题的原因可能是多了逗号、少了逗号或者括号不正确,需要检查并修订 json 文件。
(2)Failed to find any class that implements Connector
图 7-2 PostgreSQL 数据库无法正常连接报错信息
该报错提示意味着 PostgreSQL 数据库无法正常连接。造成上述问题的可能原因:
-
未将 debezium-connector-postgres-2.5.1.Final.jar 包放置到插件目录下
-
Kafka 用户对 debezium-connector-postgres-2.5.1.Final.jar 文件没有读权限
(3)Can’t find JdbcSinkConnector
查看日志提示没有 JdbcSinkConnector 包的加载。JdbcSinkConnector 是包含在 kafka-connect-jdbc-4.00.jar 包内,需要确认该 jar 包是否放置在 kafka connect 的插件路径下,确认 kafka 对该文件的读权限。再通过 java --version
查看 Java 版本是否是17,Java 版本较低时,可能无法正确加载插件。目前已知使用 Java 8 时无法正确加载该插件。
7.2 数据未同步或者未正确同步
当数据未同步或者未正确同步时,请先按以下两步进行检查。然后对照后面的提供的错误列表进行参考调整。
step1 查看同步任务状态
先查看同步任务是否报错:
cd /KFDATA/kafka-tools/bin
./rest.sh showall
再看 kafka connect 的日志中是否出现 ERROR:
cd /KFDATA/kafka-connect/logs
cat connect.log | grep ERROR
如果有出现 ERROR,看 ERROR 显示的日志是 PostgreSQL 报错还是 ddb-sink 报错,查看具体的报错信息。如果同步任务未报错,也没有 ERROR,再通过以下方式排查。
step2 查看 PostgreSQL 数据是否同步到 Kafka
查看 Kafka 所有的 topic:
cd /KFDATA/kafka-tools/bin
./kafka.sh tplist
图 7-3 kafka 中 topic 信息
在查看该 topic 对应的数据条数:
./kafka.sh get_offsets postgres_service.debezium.index_example
-
一张表出现两个 topic 名字
这说明 PostgreSQL source 任务的 topic.prefix 或 DolphinDB sink 任务的 topics 配置项拼写有误,请检查这两项。DolphinDB sink 任务的 topics 必须为 {topic.prefix}.{schema}.{tablename} 的格式。创建 sink 任务时,如果 topic 不存在,则会自动创建 topic,因此拼写错误会导致出现两个 topic 。
-
没有表对应的 topic / 有对应的 topic,但数据条数为0
这说明 PostgreSQL 数据未正常同步到 Kafka 中,请在同步任务的 table.include.list 中检查 PostgreSQL 表名的拼写,或者检查用户是否拥有 REPLICATION 和 LOGIN 权限和对对应表的 SELECT 权限。
-
有对应的 topic,有数据条数,但 DolphinDB 未同步
检查 DolphinDB Sink 任务中 topics 配置项中的拼写,检查同步任务配置表中是否有相同的条数。
查看 Kafka 中数据是否与 PostgreSQL 变更数据一致:
./tpconsumer.sh --op=2 --topic=postgres_service.debezium.index_example --offset=0 --max-messages=20
在显示的结果中,初始数据同步的消息数据 op = r,新插入数据 op = c,更新数据 op = u
7.3 同步任务运行报错
-
Java.lang.OutOfMemoryError
Kafka Connect 的默认内存为 1 GB,当数据更新量较大时会出现 JVM 内存不足,需要调整 JVM 大小。根据之前配置的安装路径,修改 kafka connect 的配置文件:
vim /KFDATA/kafka-connect/etc/kafka-connect.env
在末尾加入 JVM 选项,内存大小根据实际需要调整:
KAFKA_HEAP_OPTS="-Xms10G -Xmx10G"
-
permission denied for database postgres at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection
使用 pgoutput 插件同步时未预先创建发布,会报此错误。
CREATE PUBLICATION dbz_publication FOR TABLE debezium.index_example,debezium.stock_example;
-
Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each
每个 Debezium 连接器需要使用唯一的复制槽名称。如果多个连接器使用相同的复制槽名称,就会导致冲突。可以为每个连接器使用不同的复制槽名称或者删除原先复制槽。
SELECT * FROM pg_drop_replication_slot('your_slot_name');
8. 附录
- DolphinDB 的 Kafka-Connect 插件包:kafka-connect-jdbc-4.00.jar
- DolphinDB 的 JDBC 包:jdbc-1.30.22.5-CDC.jar
- 运维脚本包 kafka-tools:kafka-tools.tar