随着越来越多的用户使用 DolphinDB,各种不同的应用的场景也对 DolphinDB 的数据接入提出了不同的要求。部分用户需要将 PostgreSQL 的数据实时同步到 DolphinDB 中来,以满足在 DolphinDB 中使用数据的实时性需求。本篇教程将介绍使用 Debezium 来实时捕获和发布 PostgreSQL 的数据库更改事件,并完成 PostgreSQL 到 DolphinDB 的实时数据同步的完整的解决方案。

1. Debezium 同步方案概述

Debezium 是一个开源的分布式平台,用于实时捕获和发布数据库更改事件。它可以将关系型数据库(如MySQL、PostgreSQL、Oracle 等)的变更事件转化为可观察的流数据,以供其他应用程序实时消费和处理。本教程中将采用 Debezium 与 Kafka 组合的方式来实现从 PosgreSQL12 到 DolphinDB 的数据同步。

Kafka +Debezium 的数据同步方案需要部署 4 个服务,如下所示:

  • ZooKeeper:kafka 的依赖部署。
  • Kafka:数据存储。
  • Kafka-Connect:用于接入数据插件 source-connetor, sink-connector 的框架,可以提供高可用,也可以部署单实例版本。
  • Schema-Registry :提供实时同步的数据的元数据注册功能 ,支持数据序列化。

基于 Debezium 同步 PostgreSQL 数据到 DolphinDB 的架构图如下:

图 1-1 同步架构图

接下来,本教程将逐一介绍这些服务的下载、安装,以及配置数据同步任务。

2. 部署 Kafka 单实例实时数据存储

Kafka 单实例实时数据存储部署方案在 MySQL 和 Oracle 数据同步教程中均有有介绍,可以查看:Debezium+Kafka 实时同步 Oracle 11g 数据到 DolphinDB 教程,部署好 ZooKeeper,Kafka,Kafka-Connect , Schema-Registry 的数据同步、存储运行环境。

3. 从 PostgreSQL 到 Kafka 的数据同步:部署程序与配置任务

在部署好 Kafka 和 Kafka Connect 环境后,即可配置 PostgreSQL 的实时数据同步任务。

注意:PostgreSQL 的数据实时同步仅支持 UTF-8 字符集。

本节涉及的操作,若未作特殊说明,均使用操作系统用户 kafka 执行。kafka 用户的权限等配置,详见章节 2 部署 Kafka 单实例实时数据存储 部分。

3.1 配置 PostgreSQL 数据库

对于 Source 数据库是 PostgresSQL,本教程采用 PostgreSQL 自带的逻辑复制流插件 pgoutput 插件来捕获实时数据变化。pgoutput 插件是 PostgreSQL 在版本 PostgreSQL 10+ 推出并内置的插件,无须额外配置。所以本教程介绍的 PostgresSQL 到 DolphinDB 的实时数据同步方案 需要 PostgreSQL 10+ 以上版本。

本教程中使用的数据库是配置于 CentOS 7 下的 PostgresSQL 12 版本。

第一步: 配置逻辑解码和数据库访问权限

PostgreSQL 的实时变更数据捕获是通过配置 replication slots ,开启逻辑复制流实现。配置以下参数,可以管理当前数据库的逻辑解码进程。

通过 vim 命令打开 /var/lib/pgsql/12/data/postgresql.conf 文件,添加或者调整以下参数。wal_level 参数必须设置。

# REPLICATION
wal_level = logical             # minimal, archive, hot_standby, or logical (change requires restart)
max_wal_senders = 4             # max number of walsender processes (change requires restart)
#wal_keep_segments = 4          # in logfile segments, 16MB each; 0 disables
#wal_sender_timeout = 60s       # in milliseconds; 0 disables
max_replication_slots = 4       # max number of replication slots (change requires restart)

PostgreSQL 中的数据库访问权限需要在 /var/lib/pgsql/12/data/pg_hba.conf 中进行配置。可以如下配置 IP4 的所有主机的 md5 加密访问权限。

host all all 0.0.0.0/0  md5

修改配置文件需要重启数据库生效。

第二步: 创建数据库和用户

在使用数据库时,通常会为业务数据创建专门的用户和数据库进行数据管理。这里我们创建业务数据的存储数据库和业务用户,再创建一个专门进行逻辑复制的用户。

使用超级用户 postgres ,创建业务数据用户和数据库,并授予权限。

CREATE USER factoruser WITH PASSWORD '111111';
CREATE DATABASE factordb;
GRANT ALL PRIVILEGES ON DATABASE factordb TO factoruser ;

创建逻辑复制用户,并授予该用户逻辑复制权限和连接业务数据库权限。

CREATE USER datasyn WITH PASSWORD '111111';
ALTER USER datasyn WITH REPLICATION;
GRANT CONNECT ON DATABASE factordb TO datasyn;

接下来我们创建业务数据库的 schema 并授予逻辑复制用户对业务数据库的数据访问权限,这需要登录数据库,并切换到业务数据库(database)后执行。切换 database,使用 psql 连接可以按 图 3-1 操作。

图 3-1 PostgreSQL 切换 database 操作

创建业务数据 schema,需要当前数据库是 factordb。

create schema factorsch authorization factoruser;

授予 datasyn 用户对 业务数据库 factordb 的 factorsch 的具体使用权限。

GRANT USAGE ON SCHEMA factorsch TO datasyn;
GRANT SELECT ON ALL TABLES IN SCHEMA factorsch TO datasyn;

配置逻辑复制用户的逻辑复制访问权限,修改文件 /var/lib/pgsql/12/data/pg_hba.conf。

host replication datasyn 0.0.0.0/0 md5

重启数据库,并测试逻辑解码插件。

--创建复制槽
SELECT * FROM pg_create_logical_replication_slot('pgoutput_demo', 'pgoutput');
--查看相应复制槽是否存在
SELECT * FROM pg_replication_slots;
--删除复制槽
SELECT * FROM pg_drop_replication_slot('pgoutput_demo');

3.2 安装 Debezium-PostgreSQL 连接器插件

PostgreSQL 的 Debezium 数据同步插件需要安装在 Kafka Connect 程序部署文件路径下。

配置启动 Debezium-PostgreSQL 连接器,需要以下两步:

  • 下载 Debezium-PostgreSQL-Connector 插件,将插件解压并放到 Kafka Connect 的插件路径下

  • 重新启动 Kafka Connect 程序,以加载插件

第一步: 下载安装 Debezium-PostgreSQL 插件

前往官方网站 Debezium,选择 2.5.1.Final 版本进行下载,程序名为 debezium-connector-postgres-2.5.1.Final-plugin.tar.gz,本教程测试时采用的此版本,也可以使用 2.5 系列的最新版本。如果想使用 Debezium 的最新稳定版本,需要仔细阅读对应的版本需求,并进行详细测试。

图 3-2 Debezium 官网下载链接

在 confluent 的安装路径下创建插件路径,在此路径下解压 Debezium 的 PostgreSQL 插件包,请确保 kafka 用户对此路径具有读权限,如果下面代码中的下载链接失效,请按 图 3-2 官网示例位置下载。

sudo mkdir -p /opt/confluent/share/java/plugin
cd /opt/confluent/share/java/plugin
sudo wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/2.5.1.Final/debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo tar -xvf ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz
sudo rm ./debezium-connector-postgres-2.5.1.Final-plugin.tar.gz

第二步: 配置 Kafka Connect 加载插件

修改 Kafka Connect 的配置文件,添加插件路径配置。若已配置插件路径,则跳过该步骤。

cd /KFDATA/kafka-connect/etc
vim kafka-connect.properties

添加或修改参数 plugin.path 如下。

plugin.path=/opt/confluent/share/java/plugin

重新启动 Kafka Connect。

sudo systemctl restart kafka-connect

查看 kafka connect 的日志输出,能查询到信息则说明插件加载成功。

cat /KFDATA/kafka-connect/logs/connect.log | grep PostgresConnector

图 3-3 Debezium-PostgreSQL 插件加载成功信息

3.3 配置 PostgreSQL 数据同步连接任务

数据库基础配置和 PostgreSQL 同步插件安装好,我们就可以开始配置 PostgreSQL 的数据同步任务。配置同步任务及检查的很多命令都要带上 url 等参数。为了操作快捷,本教程封装了一些加载配置文件的操作脚本在 kafka-tools.tar 包中,详情参见附录。下载当前包,解压缩到 /KFDATA 目录下。后续的很多操作,包括检查 Kafka 的 topic、查看数据和配置同步任务等都会使用 kafka-tools.tar 包中的脚本。包中的脚本在无参数运行时会输出 help 文档。

cd /KFDATA
sudo tar -xvf kafka-tools.tar
sudo chown kafka:kafka kafka-tools
rm ./kafka-tools.tar

修改 /KFDATA/kafka-tools/config/config.properties 配置参数。

按照本机的路径、IP 等对应修改 Kafka 和 Kafka Connect 的启动 IP 地址,以及安装目录。

示例如下:

#kafka parameters
kafka_home=/opt/kafka
confluent_home=/opt/confluent
bootstrap_server=192.168.189.130:9092#kafka-connect parameters
connect_rest_url=192.168.1.178:8083
#rest_pd  means restful request  password,This is not necessary
#rest_pd=appsdba:passwd
schema_ip=192.168.189.130
schema_port=8081

第一步:准备 PostgreSQL 数据库表

本教程的同步方案支持 DolphinDB 的 TSDB 和 OLAP 两种存储引擎的数据同步。其中 TSDB 引擎对于单字段主键和多字段复合主键有不同的处理方式。所以这里我们创建三张表来展示不同的情况的配置操作。

  • 单主键同步到 TSDB 引擎 : factorsch.stock_example。

  • 复合主键同步到 TSDB 引擎 : factorsch.index_example_tsdb。

  • 复合主键同步到 OLAP 引擎 : factorsch.index_example_olap。

创建业务数据库表、数据,均使用 factoruser ,登录 factordb。

创建表 factorsch.stock_example。

create table factorsch.stock_example (id bigint,ts_code varchar(20),symbol_id varchar(20),name varchar(20),area varchar(20),industry varchar(20),list_date date,primary key (id)
);

插入数据。

insert into factorsch.stock_example(id,ts_code,symbol_id,name,area,industry,list_date)
values (1,'000001.SZ','000001','平安银行','深圳','银行','1991-04-03'),
(2,'000002.SZ','000002','万科A','深圳','地产','1991-01-29'),
(3,'000004.SZ','000004','ST国华','深圳','软件服务','1991-01-14');

创建表 factorsch.index_example_tsdb。

create table factorsch.index_example_tsdb (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);

插入数据。

insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000905', '中百集团', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000906', '中百集团', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上证180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

创建表 factorsch.index_example_olap。

create table factorsch.index_example_olap (trade_date date,stock_code varchar(20),effDate timestamp,indexShortName varchar(20),indexCode varchar(20),secShortName varchar(50),exchangeCD varchar(10),weight decimal(26,6),tm_stamp timestamp,flag integer,primary key (trade_date, stock_code, indexCode, flag)
);

插入数据。

insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000905', '中百集团', 'XSHE', 0.0044, to_date('2018-06-30 05:43:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30', 'YYYY-MM-DD'), '000759', to_date('2018-06-30 04:47:05', 'YYYY-MM-DD HH24:MI:SS'),
'中证500', '000906', '中百集团', 'XSHE', 0.0011, to_date('2018-06-30 05:48:06', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_olap
values(to_date('2006-11-30','YYYY-MM-DD'), '600031', to_date('2018-06-30 03:48:05', 'YYYY-MM-DD HH24:MI:SS'),
'上证180', '000010', '三一重工', 'XSHG', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

第二步:配置订阅发布

使用 PostgreSQL 的 pgoutput 插件进行逻辑复制,需要配置 publication 来定义哪些表的数据变更需要被逻辑复制。配置 publication 在 PostgreSQL 的不同版本支持的脚本写法不尽相同,本教程以 PostgreSQL 12 版本为例介绍通用的两种配置方式。

方式一:普通用户配置(推荐)

factoruser 是普通的业务数据用户,不具备超级用户权限。配置发布时需要逐一配置每张表,也可以对单一指定表进行取消配置,当有新增表时需要进行添加。这里我们将上文中创建的三张表全部配置。

创建一个发布,先发布两张表 factorsch.index_example_tsdb 和 factorsch.stock_example。

CREATE PUBLICATION factordb_publication FOR TABLE factorsch.index_example_tsdb,factorsch.stock_example

查看发布和发布的具体表。

select * from pg_publication
select * from pg_publication_tables

图 3-4 PostgreSQL 中发布列表查询

图 3-5 PostgreSQL 中的发布表明细查询

对指定 publication ,添加一张表 factorsch.index_example_olap。

ALTER PUBLICATION factordb_publication ADD TABLE factorsch.index_example_olap;

图 3-6 查看增加的发布表

对指定 publication ,删除一张表。

ALTER PUBLICATION factordb_publication DROP TABLE factorsch.index_example_olap;

删除发布。.

drop publication factordb_publication

方式二:超级用户配置

需要先提升 factoruser 权限为超级用户,可以使用超级用户 postgres 操作。

ALTER USER factoruser WITH SUPERUSER;

使用超级用户 factoruser 创建全表发布。

CREATE PUBLICATION all_tables_publication FOR ALL TABLES;

如 图 3-7 所示,所有的表都会被发布,增加新表也会直接变成发布表。

图 3-7 全表订阅发布表列表查看

对于以上两种方式,推荐使用方式一,可以精确进行逻辑复制,控制资源使用,控制用户权限。只有在确实需要整库同步,且表的数据量众多的场景可以使用方式二。

第三步:准备连接器配置文件,并启动连接任务

创建连接 PostgreSQL 的 source 连接器配置文件。

mkdir -p /KFDATA/datasyn-config
vim /KFDATA/datasyn-config/source-postgres.json

录入以下配置,hostname 和 kafka 启动地址需对应修改。注意这里的 publication.name 参数要和配置发布名一致,且要提前配置好。

{"name": "postgresTask","config": {"connector.class" : "io.debezium.connector.postgresql.PostgresConnector","snapshot.mode" : "initial","tasks.max" : "1","topic.prefix" : "pg_factorDB","database.hostname" : "192.168.189.130","database.port" : 5432,"database.user" : "datasyn","database.password" : "111111","database.dbname" : "factordb","table.include.list" : "factorsch.index_example_tsdb,factorsch.index_example_olap,factorsch.stock_example","decimal.handling.mode": "string","plugin.name": "pgoutput","publication.name": "factordb_publication","slot.name": "datasyn_slot","heartbeat.interval.ms":"20000"}
}

重要参数说明:

表 3-1 source 连接器重要参数说明

更多详细参数说明可以参看 Debezium 2.5,不同Debezium版本的参数配置不同,若使用其他版本的Debezium,需找到对应文档做修改。

第三步: 启动 PostgreSQL 的数据同步任务

通过 REST API 启动 PostgreSQL 的 source 连接器。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://192.168.189.130:8083/connectors/ -d @/KFDATA/datasyn-config/source-postgres.json

也可以通过 kafka-tools 中的脚本启动。

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/source-postgres.json

图 3-8 source 连接器启动成功信息

第四步:查看 PostgreSQL 数据同步任务状态

查看同步任务列表。list 参数展示任务名列表,showall 参数会显示全部同步任务状态。

cd /KFDATA/kafka-tools/bin
./rest.sh showall

当前只有一个同步任务,如下图所示。

图 3-9 查看全部同步任务状态信息

查看 kafka 中的 topic 列表:

./kafka.sh tplist

图 3-10 当前 kafka 中的 topic 列表

查看 表 stock_example、index_example_tsdb、index_example_olap 目前进入到 kafka 中的数据条数:

./kafka.sh get_offsets pg_factorDB.factorsch.stock_example
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_tsdb
./kafka.sh get_offsets pg_factorDB.factorsch.index_example_olap

结果如图所示,当前配置的三张数据数据表的历史数据都已经写入了 kafka 中的对应 topic 中。

图 3-11 对应 topic 中数据条数信息

4. 从Kafka 到 DolphinDB 的数据同步:部署程序与配置任务

4.1 安装 Kafka-DolphinDB 数据同步连接器插件

配置启动 Kafka-DolphinDB 连接器,需要以下两步:

  • 下载 Kafka-DolphinDB-Connector 插件,将插件解压并放到 Kafka Connect 的插件路径下。

  • 重新启动 Kafka Connect 程序,以加载插件。

第一步:下载 Kafka-DolphinDB 插件

  • jdbc-1.30.22.5-CDC.jar :该包为 DolphinDB JDBC 包为数据同步做了一些专门修改,为特殊版本。

  • kafka-connect-jdbc-4.00.jar:是基于kafka-connect-jdbc-10.7.4 开发的 DolphinDB 连接器,后续会进行代码开源。

创建插件路径,在此路径下放置 Kafka-DolphinDB 插件包,将上述两个 jar 包放在此目录下。请确保 kafka 用户包含对这两个文件的读权限。(文件包见附件

sudo mkdir -p /opt/confluent/share/java/plugin/kafka-connect-jdbc
sudo cp ~/jdbc-1.30.22.5-CDC.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/
sudo cp ~/kafka-connect-jdbc-10.7.4-ddb1.04.OLAP.jar /opt/confluent/share/java/plugin/kafka-connect-jdbc/

如果上面的操作碰到权限问题,则可以使用以下命令赋予权限。

sudo chmod o+rx /opt/confluent/share/java/plugin/kafka-connect-jdbc/*

第二步: 重启 kafka-connect

sudo systemctl restart kafka-connect

查看 kafka-connect 路径的日志输出

cat /KFDATA/kafka-connect/logs/connect.log | grep JdbcSinkConnector

如下图所示,则插件加载成功。

图 4 -1 Kafka-DolphinDB 插件加载成功信息

4.2 配置 DolphinDB 的数据同步连接任务

第一步:创建同步的 DolphinDB 库、表

根据 PostgreSQL 表结构,创建与 PostgreSQL 表结构一致的表,PostgreSQL 数据类型转换为 DolphinDB 数据类型对照表可以参考5.2节。

创建单主键表 stock_example 的 DolphinDB 对应表:

//创建 dfs://stock_data 数据库
if (existsDatabase("dfs://stock_data"))dropDatabase("dfs://stock_data")
dbName = "dfs://stock_data"
db=database(directory=dbName, partitionType=HASH, partitionScheme=[LONG, 3], engine="TSDB", atomic="CHUNK")
//创建 stock_example 表
tbName = "stock_example"
colNames = `id`ts_code`symbol_id`name`area`industry`list_date`dummySortKey__
colTypes = `LONG`SYMBOL`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DATE`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`id, sortColumns=`id`dummySortKey__, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,100}], softDelete=true)

创建复合主键表 index_example_olap 的 DolphinDB 对应表:

//创建 dfs://index_data_olap 数据库
if (existsDatabase("dfs://index_data_olap"))dropDatabase("dfs://index_data_olap")
dbName = "dfs://index_data_olap"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12,  atomic="CHUNK")
//创建 olap 引擎数据库下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date)

创建复合主键表 index_example_tsdb 的 DolphinDB 对应表:

//创建 dfs://index_data_tsdb 数据库
if (existsDatabase("dfs://index_data_tsdb"))dropDatabase("dfs://index_data_tsdb")
dbName = "dfs://index_data_tsdb"
db = database(directory=dbName, partitionType=RANGE, partitionScheme=1990.01M+(0..80)*12, engine="TSDB", atomic="CHUNK")
//创建 tsdb 引擎数据库下的 index_example
tbName = "index_example"
colNames = `trade_date`stock_code`effDate`indexShortName`indexCode`secShortName`exchangeCD`weight`tm_stamp`flag
colTypes = `DATE`SYMBOL`TIMESTAMP`SYMBOL`SYMBOL`SYMBOL`SYMBOL`DOUBLE`TIMESTAMP`INT
t = table(1:0, colNames, colTypes)
db.createPartitionedTable(t, tbName, partitionColumns=`trade_date, sortColumns=`stock_code`indexCode`flag`trade_date, keepDuplicates=LAST, sortKeyMappingFunction=[hashBucket{,10},hashBucket{,10},hashBucket{,1}], softDelete=true)

注:建表时的软删除功能,即 softDelete 选项需要 DolphinDB 2.00.11 及以上的版本。旧版本 DolphinDB 建表时可以去除该选项。

第二步: 配置同步配置表

在DolphinDB 中创建一张配置表,记录 kafka topic 和 DolphinDB 库表之间的映射关系。配置表的库表名可以自行调整,并在 DolphinDB 的同步任务中设置相应的库表名称。配置表中字段名是固定的,需和示例保持一致。

数据库名:dfs://ddb_sync_config

表名:sync_config

db = database("dfs://ddb_sync_config", HASH, [SYMBOL, 2])
t = table(1:0, `connector_name`topic_name`target_db`target_tab`add_sortcol_flag`primary_key,
[SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL, SYMBOL])
db.createTable(t, "sync_config")

kafka topic 名可以通过之前介绍的 ./kafka.sh tplist 的命令查看,当前配置的三张表对应的 topic 如下:

  • 单主键同步到 TSDB 引擎 : stock_example -> pg_factorDB.factorsch.stock_example。

  • 复合主键同步到 TSDB 引擎 : index_example_tsdb->pg_factorDB.factorsch.index_example_tsdb

  • 复合主键同步到 OLAP 引擎 : index_example_olap ->pg_factorDB.factorsch.index_example_olap。

插入配置信息表,将 kafka 中的 topic 和 DolphinDB 库表名称一一对应。

def addSyncConfig(connector_name, topic_name, dbname, tbname, add_sortcol_flag="0",primary_key=NULL) {loadTable("dfs://ddb_sync_config", "sync_config").append!(table([connector_name] as col1, [topic_name] as col2, [dbname] as col3,[tbname] as col4, [add_sortcol_flag] as col5,[primary_key] as col6))
}addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.stock_example", "dfs://stock_data", "stock_example", "1", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_tsdb", "dfs://index_data_tsdb", "index_example", "0", "")
addSyncConfig("ddb-sink-postgres", "pg_factorDB.factorsch.index_example_olap", "dfs://index_data_olap", "index_example", "0", "stock_code,indexCode,flag,trade_date")

以下是配置表的各个字段说明:

表 4-1 同步配置表字段说明

第三步: 准备连接器配置文件,并启动连接任务

创建 DolphinDB 数据同步任务配置文件。

cd /KFDATA/datasyn-config
vim ddb-sink-postgres.json

配置如下

{"name": "ddb-sink-postgres","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "pg_factorDB.factorsch.stock_example,pg_factorDB.factorsch.index_example_tsdb,pg_factorDB.factorsch.index_example_olap","connection.url": "jdbc:dolphindb://192.168.189.130:8848?user=admin&password=123456","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","transforms.unwrap.drop.tombstones": "false","auto.evolve": "false","insert.mode": "upsert","delete.enabled": "true","batch.size":"10000","pk.mode": "record_key","ddbsync.config.table":"dfs://ddb_sync_config,sync_config","ddbsync.addSortColFlag":"true","ddbsync.config.engineTypes" :"TSDB,OLAP"}
}-

表 4-2 sink 连接器重要参数说明

参数说明:以上参数项为同步 DolphinDB 所需参数。如果对 Confluent 的JDBC Sink Connect 有经验可适当调节。

通过 REST API 启动source连接器:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://183.134.101.144:8083/connectors/ -d @ddb-sink-postgres.json

也可以通过 kafka-tools 中的脚本启动:

cd /KFDATA/kafka-tools/bin
./rest.sh create /KFDATA/datasyn-config/ddb-sink-postgres.json

查看同步任务状态, ddb-sink-postgres 是 DolphinDB 的数据同步任务,可以看到现在我们有两个同步任务,这样构成了从 PostgreSQL 到 DolphinDB 的数据同步链。

./rest.sh showall

图 4-2 同步任务状态信息

第四步: 查看表初始数据同步进度

在设置 PostgreSQL 同步任务时,将 snapshot.mode 选项值设置为 ”initial” ,该选项意味着 PostgreSQL 会同步表的初始数据到 Kafka 中,设置完下游的 DolphinDB 任务后,可以检查初始数据的同步情况。

通过 kafka.sh 脚本查看消费者列表:

图 4-3 Kafka 消费者列表信息

查看 DolphinDB 同步任务对应的 Kafka 消费组中的每一个 consumer 的消费进度,通过此命令可以查看同步程序中每一张的表同步进度。 Lag 为 0 则表示 Kafka 中 topic 当前没有未消费的数据,即 Kafka 中的数据与对应表的数据是一致的。

./kafka.sh cm_detail connect-ddb-sink-postgres

图 4-4 connect-ddb-sink 中每张表同步进度

如 图 4-4 显示,数据已被 DolphinDB 同步任务消费完毕,此时在 DolphinDB 的 web 界面查看表中数据,表数据和 PostgreSQL 表中数据是一致的。

图 4-5 DolphinDB中 index_example 表数据

图 4-6 DolphinDB 中 stock_example表数据

4.3 实时同步验证

第一步:插入数据

向 PostgreSQL 中 factorsch.index_example_tsdb 表中插入两条新数据:

insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600054', 
to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH:MI:SS'), '上证180', '000010', '三一重工', 
'XXXB', 0.0043, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);insert into factorsch.index_example_tsdb
values (to_date('2006-11-30', 'YYYY-MM-DD'), '600055
', 
to_date('2018-06-30 06:48:02', 'YYYY-MM-DD HH:MI:SS'), '沪深300', '000300', '三一重工', 
'XSHG', 0.0029, to_date('2018-06-30 05:48:05', 'YYYY-MM-DD HH24:MI:SS'), 1);

查看 DolphinDB 对应的表数据:

select * from loadTable("dfs://index_data_tsdb","index_example")

可以看到新数据已写入:

图4-7 数据写入成功

第二步:更新数据

更新 PostgreSQL 中 factorsch.index_example_olap 表的 tmp_stamp 和 secshortname 的值。

update factorsch.index_example_olap 
set tm_stamp = to_date('2025-02-28 16:00:00', 'YYYY-MM-DD HH24:MI:SS'),
secshortname ='测试修改名称'
where stock_code='000759'

查看 DolphinDB 中数据,数据已被修改:

select * from loadTable("dfs://index_data_olap","index_example")

图4-8 数据更新成功

第三步:删除数据

在 PostgreSQL 中的 factorsch.stock_example 删除一条数据:

delete from factorsch.stock_example where ts_code='000004.SZ'

再查看 DolphinDB 中数据,数据已被删除:

select * from loadTable("dfs://index_data_tsdb","index_example")

图4-9 数据删除成功

5. 部署注意事项

5.1 实时同步须知

DolphinDB 是一款支持海量数据的分布式时序数据库。针对不同的数据处理需求,在底层架构上天然上与通常的关系型数据库不同,所以需要有以下限制:

  • DolphinDB 的表没有主键设计,若使用 TSDB 引擎,需将主键设置为 sortColumn 字段,并设置 keepDuplicates=LAST 来进行去重,以确保数据唯一性。TSDB 引擎的 sortColumn 是分区内去重,如果使用的是分区表,需要至少将其中一个主键列设置为分区列。若使用 OLAP 引擎,需在同步配置表中设置目标库的主键。

  • PostgreSQL 表的主键可能不满足 TSDB 引擎的 sortColumn 设置规则,有以下三种情况:

 1. PostgreSQL 表中有两个及以上的主键,其中一个主键为整数类型或时间类型,但末尾列不是整数类型或时间类型:

  • 该情况需要调整 sortColumn 设置的顺序,将整数类型或时间类型的主键移动到末尾。

2. PostgreSQL 表中只有一个主键,或者 PostgreSQL 表中的主键的数据类型均不包含整数类型或时间类型:

  • 该情况需要建表时在末尾补充一个 dummySortKey__ 列,值均设置为0,对应同步程序的配置表中需要将 add_sortcol_flag 列的值设置为“1”,并将 DolphinDB 同步连接任务中“ddbsync.addSortColFlag” 设置为 “true”。若使用 DataX 等工具进行初始全表同步,则需要做数据转换,提供该字段对应初始值。

3. PostgreSQL 表中的主键类型包含 DolphinDB 不支持的类型。

  • DolphinDB TSDB 引擎的 sortColumns 支持整数、日期或时间、字符串类型,暂时不支持小数类型,但在后续的版本里可能提供支持,请关注版本更新。

DDL 语句相关:

  • 当前不支持 DDL 语句同步。

  • 若表结构发生更改,需进行单表修复,具体操作后续会在实时同步的运维手册文档中给出。

其他:

  • 表字段命名时,请尽量规避一些简单的名字,比如 code, timestamp 等,这种命名与 DolphinDB 内关键字重复,可能会导致无法正确同步。

5.2 PostgreSQL-DolphinDB 数据类型对应表

以下的类型对应表为推荐设置的 DolphinDB 类型,注意两者数据类型表示的精度范围,确保 DolphinDB 数据类型的精度可以覆盖原 PostgreSQL 类型。

表 5-1 PostgreSQL-DolphinDB 数据类型对应表

在浮点数数据处理上,PostgreSQL 的 DECIMAL 类型是精确值,DolphinDB 的 DOUBLE 类型的精度为15-16位有效数字,如果转换成 DolphinDB 的 DOUBLE 类型,会存在浮点数精度丢失问题。因此推荐用户转换成 DolphinDB 的 DECIMAL 类型,确保浮点数精度。

在时间类型转换上,请参照表中的类型映射,以保证 DolphinDB 中的时间类型字段在精度上可以覆盖 PostgreSQL 中时间类型字段的精度。对于带有时区信息的时间类型,DolphinDB 统一以 UTC 时区存储。

6. 同步性能测试

6.1 性能测试配置

建表语句

PostgreSQL 建表,并生成测试数据代码:

DROP TABLE if EXISTS factorsch.performance_test1;
CREATE TABLE factorsch.performance_test1 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成100w行数据,每天1000行
INSERT INTO factorsch.performance_test1 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 1000), ((gs-1)%1000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 1000000) gs;ALTER TABLE factorsch.performance_test1
ADD CONSTRAINT pk_performance_test1 PRIMARY KEY (id, dt);DROP TABLE if EXISTS factorsch.performance_test2;
CREATE TABLE debezium.performance_test2 (dt date,id varchar(20),str1 char(10),val DECIMAL,qty varchar(20),tm TIMESTAMP
);
-- 生成1亿行数据,每天100000行
INSERT INTO factorsch.performance_test2 (dt,id,str1,val,qty,tm)
SELECT ('2020-01-01'::date + (gs - 1) / 100000), ((gs-1)%100000+1),'aa', 1.234, 1000, '2024-01-01 15:00:00'::timestamp
FROM generate_series(1, 100000000) gs;ALTER TABLE factorsch.performance_test2
ADD CONSTRAINT pk_performance_test2 PRIMARY KEY (id, dt);grant select on factorsch.performance_test1 to datasyn;
grant select on factorsch.performance_test2 to datasyn;

DolphinDB 建表代码:

dbName = "dfs://performance_test1"
tbName = "performance_test1"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
db = database(dbName, HASH, [SYMBOL, 2], , 'TSDB', 'CHUNK')
db.createTable(t, tbName, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)dbName = "dfs://performance_test2"
tbName = "performance_test2"
colNames = `dt`id`str1`val`qty`tm
colTypes = `DATE`SYMBOL`SYMBOL`DOUBLE`LONG`TIMESTAMP
t = table(1:0, colNames, colTypes)
pkColumns = `id`dt
partitionCols = `dt`id
db1 = database(, RANGE, date(datetimeAdd(1990.01M, 0..100*12, 'M')))
db2 = database(, HASH, [SYMBOL, 50])
db = database(dbName, COMPO, [db1, db2], , `TSDB, `CHUNK)
db.createPartitionedTable(t, tbName, partitionColumns=partitionCols, sortColumns=pkColumns, keepDuplicates=LAST, softDelete=true)

6.2 性能测试结果

性能测试结果如下表所示,其中总耗时等于 DolphinDB 更新完成时间减去 PostgreSQL 更新完成时间,因此总耗时包含了以下数据同步的完整链路:

  • Debezium 挖掘 PostgreSQL 日志到 Kafka

  • Kafka 推送数据给相应 topic 的消费者

  • 下游的 DolphinDB Connector 消费 Kafka 中数据,解析为相应的 DolphinDB 更新语句,并执行写入 DolphinDB 完成

Kafka 每次推送的变更数据在 3000-4000 条,具体条数和 Kafka 的日志大小配置相关。对于 insert 和 update 类型的操作,DolphinDB 的处理效率很高。对于 delete 类型操作,由于 delete 操作涉及数据查找, DolphinDB 的处理效率和具体表的数据行数、分区方式相关。

表 6-1 使用 pgoutput 插件进行数据同步性能测试结果

7. 常见问题解答(FAQ)

7.1 创建同步任务时报错

(1)json 文件格式错误

图 7-1 json文件格式错误报错信息

造成上述问题的原因可能是多了逗号、少了逗号或者括号不正确,需要检查并修订 json 文件。

(2)Failed to find any class that implements Connector

图 7-2 PostgreSQL 数据库无法正常连接报错信息

该报错提示意味着 PostgreSQL 数据库无法正常连接。造成上述问题的可能原因:

  • 未将 debezium-connector-postgres-2.5.1.Final.jar 包放置到插件目录下

  • Kafka 用户对 debezium-connector-postgres-2.5.1.Final.jar 文件没有读权限

(3)Can’t find JdbcSinkConnector

查看日志提示没有 JdbcSinkConnector 包的加载。JdbcSinkConnector 是包含在 kafka-connect-jdbc-4.00.jar 包内,需要确认该 jar 包是否放置在 kafka connect 的插件路径下,确认 kafka 对该文件的读权限。再通过 java --version 查看 Java 版本是否是17,Java 版本较低时,可能无法正确加载插件。目前已知使用 Java 8 时无法正确加载该插件。

7.2 数据未同步或者未正确同步

当数据未同步或者未正确同步时,请先按以下两步进行检查。然后对照后面的提供的错误列表进行参考调整。

step1 查看同步任务状态

先查看同步任务是否报错:

cd /KFDATA/kafka-tools/bin
./rest.sh showall

再看 kafka connect 的日志中是否出现 ERROR:

cd /KFDATA/kafka-connect/logs
cat connect.log | grep ERROR

如果有出现 ERROR,看 ERROR 显示的日志是 PostgreSQL 报错还是 ddb-sink 报错,查看具体的报错信息。如果同步任务未报错,也没有 ERROR,再通过以下方式排查。

step2 查看 PostgreSQL 数据是否同步到 Kafka

查看 Kafka 所有的 topic:

cd /KFDATA/kafka-tools/bin
./kafka.sh tplist

 图 7-3 kafka 中 topic 信息

在查看该 topic 对应的数据条数:

./kafka.sh get_offsets postgres_service.debezium.index_example
  • 一张表出现两个 topic 名字

这说明 PostgreSQL source 任务的 topic.prefix 或 DolphinDB sink 任务的 topics 配置项拼写有误,请检查这两项。DolphinDB sink 任务的 topics 必须为 {topic.prefix}.{schema}.{tablename} 的格式。创建 sink 任务时,如果 topic 不存在,则会自动创建 topic,因此拼写错误会导致出现两个 topic 。

  • 没有表对应的 topic / 有对应的 topic,但数据条数为0

这说明 PostgreSQL 数据未正常同步到 Kafka 中,请在同步任务的 table.include.list 中检查 PostgreSQL 表名的拼写,或者检查用户是否拥有 REPLICATION 和 LOGIN 权限和对对应表的 SELECT 权限。

  • 有对应的 topic,有数据条数,但 DolphinDB 未同步

检查 DolphinDB Sink 任务中 topics 配置项中的拼写,检查同步任务配置表中是否有相同的条数。

查看 Kafka 中数据是否与 PostgreSQL 变更数据一致:

./tpconsumer.sh --op=2 --topic=postgres_service.debezium.index_example --offset=0 --max-messages=20

在显示的结果中,初始数据同步的消息数据 op = r,新插入数据 op = c,更新数据 op = u

7.3 同步任务运行报错

  • Java.lang.OutOfMemoryError

Kafka Connect 的默认内存为 1 GB,当数据更新量较大时会出现 JVM 内存不足,需要调整 JVM 大小。根据之前配置的安装路径,修改 kafka connect 的配置文件:

vim /KFDATA/kafka-connect/etc/kafka-connect.env

在末尾加入 JVM 选项,内存大小根据实际需要调整:

KAFKA_HEAP_OPTS="-Xms10G -Xmx10G"
  • permission denied for database postgres at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection

使用 pgoutput 插件同步时未预先创建发布,会报此错误。

CREATE PUBLICATION dbz_publication FOR TABLE debezium.index_example,debezium.stock_example;
  • Creation of replication slot failed; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each

每个 Debezium 连接器需要使用唯一的复制槽名称。如果多个连接器使用相同的复制槽名称,就会导致冲突。可以为每个连接器使用不同的复制槽名称或者删除原先复制槽。

SELECT * FROM pg_drop_replication_slot('your_slot_name');

8. 附录

  • DolphinDB 的 Kafka-Connect 插件包:kafka-connect-jdbc-4.00.jar
  • DolphinDB 的 JDBC 包:jdbc-1.30.22.5-CDC.jar
  • 运维脚本包 kafka-tools:kafka-tools.tar

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913023.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913023.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913023.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于联咏(Novatek )白平衡色温坐标系探究

目录 一、疑问 二、结论 三、分析 四、释疑 五、仿真模拟 一、疑问 为什么Novatek的白平衡色温坐标系是这个样子的呢?各条直线和曲线分别代表什么含义呢?色温坐标系中所标定的参数代表什么含义呢?如何标定新增一些特殊的光源呢?二、结论

Protein FID:AI蛋白质结构生成模型评估新指标

一、引言:蛋白质生成模型面临的评估挑战 近年来,AI驱动的蛋白质结构生成模型取得了令人瞩目的进展,但如何有效评估这些模型的质量却一直是一个悬而未决的问题。虽然实验验证仍然是金标准,但计算机模拟评估对于快速开发和比较机器…

Vim 高效编辑指南:从基础操作到块编辑的进阶之路

文章目录🔠 一、基础编辑命令(生存必备)⚡ 二、进阶操作:可视化块模式 (Ctrl+v)典型应用场景🚀 三、效率提升技巧💡 四、配置建议(~/.vimrc)结语作为开发者最强大的文本编辑器之一,Vim 的高效操作离不开其命令模式(Normal Mode)。本文将系统性地介绍 Vim 的核心编…

docker学习第一天框架学习以及在redhat7.9安装操作

一.docker是什么。 Docker 是一个开源的容器化平台,通过将应用程序及其依赖项(如代码、运行时环境、系统工具等)打包到轻量级、可移植的容器中,实现「一次构建,处处运行」的现代化开发模式。它利用了 Linux 内核特性来…

QT控件 使用Font Awesome开源图标库修改QWidget和QML两种界面框架的控件图标

又一个月快要结束了,在这里总结下分别在QWidget和QML两种界面设计模式中应用Font Awesome开源图标库,修改界面的显示图标效果, AriaNg是aria2的可视化web界面工具,其中的图标大都是Font AWesome中的字体图标,某位曾经尝试将AriaNg…

Qt Quick 与 QML(四)qml中的Delegate系列委托组件

一、概念 在QML中,Delegate是一种非常重要的组件,特别是在使用ListView、GridView、PathView等视图组件时。Delegate用于定义每个列表或网格中的项目是如何展示的。通过自定义Delegate,你可以控制每个项目的外观和行为。 Delegate通常是一个…

android图片优化

在 Android 中加载大图时,如果不进行优化处理,很容易导致内存溢出(OOM)和应用卡顿。以下是几种高效处理大图加载的方法和最佳实践: 1. 使用图片加载库(推荐) 成熟的第三方库已经处理了内存管理…

【机器人】复现 DOV-SG 机器人导航 | 动态开放词汇 | 3D 场景图

DOV-SG 建了动态 3D 场景图,并使用LLM大型语言模型进行任务分解,从而能够在交互式探索过程中对 3D 场景图进行局部更新。 来自RA-L 2025,适合长时间的 语言引导移动操作,动态开放词汇 3D 场景图。 论文地址:Dynamic …

mongodb 中dbs 时,local代表的是什么

在 MongoDB 中,local 是一个内置的系统数据库,用于存储当前 MongoDB 实例(或副本集节点)的元数据和内部数据,与其他数据库不同,local 数据库的数据不会被复制到副本集的其他成员。 local 数据库的核心作用 …

Spring Cloud(微服务部署与监控)

📌 摘要 在微服务架构中,随着服务数量的增长和部署复杂度的提升,如何高效部署、持续监控、快速定位问题并实现自动化运维成为保障系统稳定性的关键。 本文将围绕 Spring Cloud 微服务的部署与监控 展开,深入讲解: 微…

音频动态压缩算法曲线实现

Juce实现动态压缩曲线绘制 动态范围压缩算法(Dynamic Range Compression,DRC)是将音频信号的动态范围映射到一个较小的范围内的过程,即降低较高的峰值的信号电平,而不处理较安静的部分。DRC被广泛用于音频录制、制作工…

技术视界 | OpenLoong 控制框架:打造通用人形机器人智能系统的中枢基座

在人形机器人向通用性、智能化方向加速演进的当下,控制系统的角色正在发生根本变化:它不再只是底层驱动的接口适配层,也不只是策略调用的转译引擎,而是成为连接具身模型、异构本体与多样化任务的“中枢神经系统”。 在 2025 年张…

IOS 蓝牙连接

最近做一个硬件设备,写IOS相应的数据连接/分析代码时;发现一个问题,如果是开机,每次都能连接上。连接断开后,发现再也扫描不到了。通过第三方工具LightBlue,发现信号是-127。 此时进入设置查看蓝牙设备&am…

【硬核数学 · LLM篇】3.1 Transformer之心:自注意力机制的线性代数解构《从零构建机器学习、深度学习到LLM的数学认知》

我们已经完成了对机器学习和深度学习核心数学理论的全面探索。我们从第一阶段的经典机器学习理论,走到了第二阶段的深度学习“黑盒”内部,用线性代数、微积分、概率论、优化理论等一系列数学工具,将神经网络的每一个部件都拆解得淋漓尽致。 …

flutter封装vlcplayer的控制器

import dart:async;import package:flutter_vlc_player/flutter_vlc_player.dart; import package:flutter/material.dart;class GlobalVlcController extends ChangeNotifier {//设置单例/*static final GlobalVlcController _instance GlobalVlcController._internal();fact…

SEO-滥用元机器人、规范或 hreflang 标签

&#x1f9f1; 一、滥用 Meta Robots 标签 ❌ 常见问题&#xff1a; 问题描述设置了 noindex 不该屏蔽的页面比如产品页、分类页被意外 noindex&#xff0c;导致不被收录设置 nofollow 导致内链失效所有链接都被 nofollow&#xff0c;影响爬虫抓取路径在 <meta> 标签和…

笨方法学python -练习14

程序&#xff1a; from sys import argv script, user_name argv prompt > print(f"Hi {user_name}, Im the {script} script.") print("Id like to ask you a few questions.") print(f"Do you like me {user_name}?") likes in…

Frida:配置自动补全 in VSCode

1. 前言 编写 frida JavaScript 脚本是一件 very 普遍的事情在 Android Reverse 中。为了方便编写&#xff0c;配置相关的环境使其能够自动补全是很关键的&#xff0c;即通过类名就能够获取该类的所有对外接口信息&#xff0c;这是面向对象编程的核心优势&#xff0c;可惜我没…

FPGA矩阵算法实现

简介 现如今设计上对速度的要求越来越高&#xff0c;而矩阵相乘含有大量的乘法和加法计算&#xff0c;造成计算时间长从而影响性能&#xff0c;本章节利用FPGA实现浮点型矩阵运算&#xff0c;可在极短时间内完成矩阵运算。 知识介绍 矩阵计算公式如下&#xff1a; 需要保证A的…

C#可空类型详解:从基础到高级应用

C#可空类型详解&#xff1a;从基础到高级应用 在C#编程中&#xff0c;可空类型是一个非常重要的概念&#xff0c;它允许我们为值类型&#xff08;如int、bool、DateTime等&#xff09;分配null值&#xff0c;从而增强了代码的表达能力和灵活性。本文将详细介绍C#中可空类型的各…