一、Flink部署

1.1、JAVA环境

vi /etc/profile
export JAVA_HOME=/data/flinkcdc/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATHsource /etc/profilevi ~/.bash_profileexport FLINK_HOME=/data/flinkcdc/flink-1.17.0
export PATH=$PATH:$FLINK_HOME/binsource ~/.bash_profile

1.2、配置Flink

vim conf/flink-conf.yaml
添加配置:env.java.home=/data/flinkcdc/jdk1.8.0_181①、localhost  修改为IP地址
rest.port: 8088
rest.address: 192.168.33.231
②、关闭防火墙
systemctl status firewalld
systemctl stop firewalld

1.3、Flink CDC Jar包

CDC jar放到Flink安装包解压之后的lib目录

1.4、启动flink

bin/start-cluster.shFlink Web-UI
http://192.168.33.231:8088

1.5、启动 Flink SQL CLI

bin/sql-client.sh

二、达梦数据库搭建

2.1、docker dm8

docker run -d \
--name dm8 \
--restart=always \
--privileged=true \
-e LD_LIBRARY_PATH=/opt/dmdbms/bin \
-e PAGE_SIZE=16 \
-e EXTENT_SIZE=32 \
-e LOG_SIZE=1024 \
-e CASE_SENSITIVE=0 \
-e UNICODE_FLAG=1 \
-e INSTANCE_NAME=DM8_CDC \
-e SYSDBA_PWD=SYSDBA001 \
-v /docker/dm8_data_cdc:/opt/dmdbms/data \
-p 5236:5236 \
dm8_flinkcdc:dm8

查看容器运行情况

查看数据库容器
lsof -i:5236
docker logs -f  dm8
docker exec -it dm8 bash

2.2、开启达梦日志归档

##查看当前数据库是否开启归档
select arch_mode from v$database;
##查询有哪些归档日志
SELECT NAME , FIRST_TIME , NEXT_TIME , FIRST_CHANGE# , NEXT_CHANGE# FROM V$ARCHIVED_LOG; 
SELECT * FROM V$ARCH_FILE##修改数据库实例的 /dmdata/DAMEGN/dm.ini文件中 ARCH_INI 参数值
vi /dmdata/DAMENG/dm.ini
##将 ARCH_INI 值改为 1,保存后退出
ARCH_INI = 1 #开启归档功能
RLOG_APPEND_LOGIC = 1##新增文件dmarch.ini
vi /dmdata/DAMENG/dmarch.ini
##新增如下内容
[ARCHIVE_LOCAL1]
ARCH_TYPE = LOCAL
ARCH_DEST = /dmarch
ARCH_FILE_SIZE = 2048
ARCH_SPACE_LIMIT = 102400##最后重启数据库完成归档配置#DaMeng Database Archive Configuration file
#this is commentsARCH_WAIT_APPLY      = 0[ARCHIVE_LOCAL1]ARCH_TYPE            = LOCALARCH_DEST            = /opt/dmdbms/data/DAMENG/archARCH_FILE_SIZE       = 1024ARCH_SPACE_LIMIT     = 51200ARCH_FLUSH_BUF_SIZE  = 0ARCH_HANG_FLAG       = 1

 2.3、重启dm8数据库

docker restart dm8

三、实时同步测试

##达梦
CREATE TABLE t_source_dm (id INT,name VARCHAR,insert_date DATE,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'dm','startupOptions' = 'Initial','hostname' = '192.168.33.231','port' = '5236','username' = 'SYSDBA','password' = 'SYSDBA001','database' = 'DM8_CDC','schema' = 'SYSDBA','table' = 'dm_flinkcdc'
);##MYSQL
CREATE TABLE sink_mysql_test (id int,name STRING,insert_date date,PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.33.231:3306/flinkcdc','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'YTP1101102233','table-name' = 'dm_to_mysql'
);insert into sink_mysql_test 
select * from t_source_dm;

四、数据实时同步样本(很多留言说没有用,这里证实一下,不成功重点检查达梦配置) 

4.1、代码

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 达梦数据库有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkDMCDC
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");//properties.setProperty("provide.transaction.metadata", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource = new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//检查点文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.fromSource(changeEventSource, WatermarkStrategy.noWatermarks(), "DmSource").setParallelism(1).print()env.execute();}
}

4.1.1、从头开始采集数据,运行结果

4.1.2、数据插入 结果:

4.1.3、数据更新 结果:

4.1.4、数据删除 结果:

4.2、达梦实时同步CDC 转换为SQL代码(直接拿去用)

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 达梦数据库有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQL
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//检查点文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 处理CDC数据并生成JSON格式的SQL语句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 创建JSON结构ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 处理时间戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根据操作类型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("insert", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("update", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("delete", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else {values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 构建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假设ID是主键,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 构建WHERE部分(使用所有字段作为条件以确保准确性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}private String escapeSQL(String value) {return value.replace("'", "''");}// 时间戳格式化方法private String formatTimestamp(long timestampNs) {long timestampMs = timestampNs / 1000000L;SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");return sdf.format(new Date(timestampMs));}});// 将SQL语句写入文件jsonStream.writeAsText("D:\\tmp\\flink-cdc-sql-output.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);env.execute();}
}

 4.3、达梦实时同步CDC 推送Kafka,解耦代码(直接拿去用)

/** @(#)FlinkDMCDC.java, 2023年8月5日 上午10:33:17** Copyright (c) 2000-2020, 达梦数据库有限公司.* All rights reserved.*/
package com.dameng.flinkcdc.dm;import java.io.File;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
import org.apache.flink.cdc.connectors.dm.source.DMSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;/**** Created by wuxin on 2023年8月5日 上午10:33:17*/
public class FlinkDMCDCSQLKafka
{public static void main(String[] args) throws Exception {Properties properties = new Properties();properties.setProperty("database.tablename.case.insensitive", "false");properties.setProperty("log.mining.strategy", "offline_catalog");properties.setProperty("log.mining.continuous.mine", "true");properties.setProperty("lob.enabled", "true");JdbcIncrementalSource<String> changeEventSource =new DMSourceBuilder<String>().hostname("localhost").port(15236).databaseList("DAMENG").tableList("FLINK_CDC.CDC_TEST").schemaList("FLINK_CDC").username("SYSDBA").password("SYSDBA112233").startupOptions(StartupOptions.initial()).dmProperties(properties).includeSchemaChanges(true).deserializer(new JsonDebeziumDeserializationSchema()).sliceSize(20).scanNewlyAddedTableEnabled(true).build();Configuration configuration = new Configuration();//检查点文件StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);env.enableCheckpointing(20 * 1000);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(6*1000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.setStateBackend(new FsStateBackend("file:///D:/tmp/ck"));DataStream<String> sourceStream = env.fromSource(changeEventSource,WatermarkStrategy.noWatermarks(),"DmSource");// 处理CDC数据并生成JSON格式的SQL语句DataStream<String> jsonStream = sourceStream.map(new MapFunction<String, String>() {private transient ObjectMapper objectMapper;private final JsonNodeFactory nodeFactory = JsonNodeFactory.instance;@Overridepublic String map(String value) throws Exception {if (objectMapper == null) {objectMapper = new ObjectMapper();}JsonNode rootNode = objectMapper.readTree(value);String op = rootNode.path("op").asText();JsonNode sourceNode = rootNode.path("source");String schema = sourceNode.path("schema").asText();String table = sourceNode.path("table").asText();long tsMs = rootNode.path("ts_ms").asLong();// 创建JSON结构ObjectNode json = nodeFactory.objectNode();// 1. 添加metadata部分ObjectNode metadata = json.putObject("metadata").put("schema", schema).put("table", table).put("source_timestamp", tsMs);// 处理时间戳字段JsonNode afterNode = rootNode.path("after");if (!afterNode.isMissingNode() && afterNode.has("TIMESTAMP")) {long timestampNs = afterNode.path("TIMESTAMP").asLong();metadata.put("event_time", formatTimestamp(timestampNs));} else {metadata.put("event_time", formatTimestamp(tsMs * 1000000L));}// 2. 根据操作类型生成SQL并添加到JSONString sql = "";switch (op) {case "r":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "c":sql = generateInsertSQL(schema + "." + table, afterNode);json.putObject("sql").put("dml", sql);break;case "u":JsonNode beforeNode = rootNode.path("before");sql = generateUpdateSQL(schema + "." + table, beforeNode, afterNode);json.putObject("sql").put("dml", sql);break;case "d":JsonNode beforeNodeDelete = rootNode.path("before");sql = generateDeleteSQL(schema + "." + table, beforeNodeDelete);json.putObject("sql").put("dml", sql);break;default:json.put("error", "UNKNOWN OPERATION: " + op);}// 配置ObjectMapper禁用美化打印(默认即为紧凑格式)
//                ObjectMapper mapper = new ObjectMapper();
//                String jsonString = mapper.writeValueAsString(json);return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);}private String generateInsertSQL(String tableName, JsonNode afterNode) {StringBuilder fields = new StringBuilder();StringBuilder values = new StringBuilder();afterNode.fieldNames().forEachRemaining(fieldName -> {if (fields.length() > 0) {fields.append(", ");values.append(", ");}fields.append(fieldName);JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {values.append("NULL");} else if (valueNode.isTextual()) {// 处理文本类型(含转义)values.append("'").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 处理时间戳转换(纳秒/毫秒级)values.append("'").append(formatTimestamp(valueNode.asLong())).append("'");} else if (valueNode.isNumber()) {// 处理其他数字类型values.append(valueNode.asText());} else {// 默认处理(如布尔值等)values.append(valueNode.asText());}});return String.format("INSERT INTO %s (%s) VALUES (%s)",tableName, fields.toString(), values.toString());}private String generateUpdateSQL(String tableName, JsonNode beforeNode, JsonNode afterNode) {StringBuilder setClause = new StringBuilder();StringBuilder whereClause = new StringBuilder();// 构建SET部分afterNode.fieldNames().forEachRemaining(fieldName -> {if (!fieldName.equals("ID")) { // 假设ID是主键,不更新if (setClause.length() > 0) {setClause.append(", ");}JsonNode valueNode = afterNode.get(fieldName);if (valueNode.isNull()) {setClause.append(fieldName).append(" = NULL");} else if (valueNode.isTextual()) {setClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 处理时间戳转换(纳秒/毫秒级)setClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {setClause.append(fieldName).append(" = ").append(valueNode.asText());}}});// 构建WHERE部分(使用所有字段作为条件以确保准确性)beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 处理时间戳转换(纳秒/毫秒级)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("UPDATE %s SET %s WHERE %s",tableName, setClause.toString(), whereClause.toString());}private String generateDeleteSQL(String tableName, JsonNode beforeNode) {StringBuilder whereClause = new StringBuilder();beforeNode.fieldNames().forEachRemaining(fieldName -> {if (whereClause.length() > 0) {whereClause.append(" AND ");}JsonNode valueNode = beforeNode.get(fieldName);if (valueNode.isNull()) {whereClause.append(fieldName).append(" IS NULL");} else if (valueNode.isTextual()) {whereClause.append(fieldName).append(" = '").append(escapeSQL(valueNode.asText())).append("'");} else if (valueNode.isLong() && isTimestampField(fieldName)) {// 处理时间戳转换(纳秒/毫秒级)whereClause.append(fieldName).append(" = '").append(formatTimestamp(valueNode.asLong())).append("'");} else {whereClause.append(fieldName).append(" = ").append(valueNode.asText());}});return String.format("DELETE FROM %s WHERE %s", tableName, whereClause.toString());}// 判断字段是否为时间戳字段(根据命名约定或业务逻辑)private boolean isTimestampField(String fieldName) {return fieldName.toLowerCase().contains("time") ||fieldName.toLowerCase().contains("date") ||fieldName.toLowerCase().contains("ts");}// 格式化时间戳(支持纳秒/毫秒/秒级)private String formatTimestamp(long timestamp) {// 判断时间戳精度(假设大于1e16为纳秒,大于1e12为微秒,其余为毫秒/秒)if (timestamp > 1e16) {timestamp /= 1_000_000; // 纳秒转毫秒} else if (timestamp > 1e12) {timestamp /= 1_000; // 微秒转毫秒} else if (timestamp < 1e10) {timestamp *= 1_000; // 秒转毫秒}// 使用Java 8时间API格式化(线程安全)return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));}// SQL特殊字符转义(防止注入)private String escapeSQL(String input) {return input.replace("'", "''");}});// 创建Kafka SinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("172.30.139.111:19092,172.30.139.111:29092,172.30.139.111:39092,172.30.139.111:49092") // Kafka broker地址.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("dm-cdc-topic") // Kafka主题名称.setValueSerializationSchema(new SimpleStringSchema()) // 使用字符串序列化.build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 精确一次交付.setProperty("transaction.timeout.ms", "300000") // 设置为5分钟(需小于Broker的15分钟限制).setProperty("acks", "all") // 确保高可靠性.build();// 构建数据处理管道jsonStream.sinkTo(kafkaSink); // 将数据发送到Kafkaenv.execute();}
}

五、FlinkCDC 达梦数据库 所需文件下载

5.1、所需Jar包

5.2、支持JAVA程序和SQL

 

5.3、完成程序和说明文档下载地址

版本1:

https://download.csdn.net/download/ytp552200ytp/90103896

版本2(最新版本截止:2025-06-03) 

https://download.csdn.net/download/ytp552200ytp/91119461

如果实在没有CSDN积分,后台联系我留下邮箱,我看到后私信发到邮箱,支持共享。

6、情况说明

我看很多说没用,不增量啥的,核心原因是DM数据库没有配置好,按照上面的步骤去配置或查看文档中的说明操作,保证DM数据库归档日志正常,上面的代码直接可以使用拿去测试吧。归档日志查询SQL,查查核验一下!!!

1、通过开启归档日志  SQL查询处理 (SCN 作为标记增量查询)-- 查看所有归档日志文件信息
SELECT * FROM SYS.V$ARCHIVED_LOG;-- 或使用以下视图查看归档文件详细信息
SELECT * FROM SYS.V$ARCH_FILE;--添加文件
DBMS_LOGMNR.ADD_LOGFILE('./dmarch/ARCHIVE_LOCAL1_0x1873DFE0_EP0_2025-06-26_11-34-09.log')-- 或使用默认参数分析所有添加的日志
DBMS_LOGMNR.START_LOGMNR(OPTIONS => 2130);--查询 日志明细
SELECT *
FROM V$LOGMNR_CONTENTSSELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP,SEG_OWNER, TABLE_NAME 
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME IS NOT NULL-- 查看特定表的操作
SELECT OPERATION_CODE, SCN, SQL_REDO, TIMESTAMP, SEG_OWNER, TABLE_NAME 
FROM V$LOGMNR_CONTENTS
WHERE TABLE_NAME = 'YOUR_TABLE_NAME';-- 查看DDL操作
SELECT SQL_REDO FROM V$LOGMNR_CONTENTS 
WHERE SQL_REDO LIKE '%CREATE%' OR SQL_REDO LIKE '%ALTER%' OR SQL_REDO LIKE '%DROP%';--结束日志分析
DBMS_LOGMNR.END_LOGMNR();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/922077.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/922077.shtml
英文地址,请注明出处:http://en.pswp.cn/news/922077.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Eip开源主站EIPScanner在Linux上的调试记录(二 多生产者连接)

目录 一、背景 二、可行性验证 三、开发调试 一、背景 在一般场景下&#xff0c;只需一路IO连接&#xff0c;但稍微复杂的场景&#xff0c;就需要不同通讯周期的连接&#xff0c;这就需要有多组IO连接。 而大于一组的连接调试方法是一样的&#xff0c;因此主要解决2组连接的…

Oracle APEX 利用卡片实现翻转(方法二)

目录 0. 以 Oracle 的标准示例表 EMP 为例&#xff0c;实现卡片翻转 1. 创建卡片区域 (Cards Region) 2. 定义卡片的 HTML 结构 3. 添加 CSS 实现样式和翻转动画 4. 创建动态操作触发翻转 5. 运行效果 0. 以 Oracle 的标准示例表 EMP 为例&#xff0c;实现卡片翻转 目标如…

低代码拖拽实现与bpmn-js详解

低代码平台中的可视化拖拽功能是其核心魅力所在&#xff0c;它让构建应用变得像搭积木一样直观。下面我将为你梳理其实现原理&#xff0c;并详细介绍 vue-draggable 这个常用工具。 &#x1f9f1; 一、核心架构&#xff1a;三大区域与数据驱动 低代码编辑器界面通常分为三个核心…

【科研绘图系列】R语言绘制模型预测与数据可视化

禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者! 文章目录 介绍 加载R包 数据下载 函数 导入数据 数据预处理 画图 总结 系统信息 介绍 本文介绍了一种利用R语言进行海洋微生物群落动态分析的方法,该方法通过构建多个统计模型来预测不同环境…

TODO的面试(dw三面、sqb二面、ks二面)

得物的前端三面&#xff08;通常是技术终面&#xff09;会深入考察你的技术深度、项目经验、解决问题的思路以及职业素养。下面我结合搜索结果&#xff0c;为你梳理一份得物前端三面的常问问题及详解&#xff0c;希望能助你一臂之力。 &#x1f9e0; 得物前端三面常问问题及详解…

开发 PHP 扩展新途径 通过 FrankenPHP 用 Go 语言编写 PHP 扩展

通过 FrankenPHP 用 Go 语言编写 PHP 扩展 在 PHPVerse 2025 大会上&#xff08;JetBrains 为纪念 PHP 语言 30 周年而组织的会议&#xff09;&#xff0c;FrankenPHP 开发者 Kvin Dunglas 做了一个开创性的宣布&#xff1a;通过 FrankenPHP&#xff0c;可以使用 Go 语言创建 …

完美解决:应用版本更新,增加字段导致 Redis 旧数据反序列化报错

完美解决&#xff1a;应用版本更新&#xff0c;增加字段导致 Redis 旧数据反序列化报错 前言 在敏捷开发和快速迭代的今天&#xff0c;我们经常需要为现有的业务模型增加新的字段。但一个看似简单的操作&#xff0c;却可能给正在稳定运行的系统埋下“地雷”。 一个典型的场景是…

66-python中的文件操作

1. 文件的编码 UTF-8 GBK GB2312 Big5 GB18030 2. 文件读取 文件操作步骤: 打开文件 读\写文件 关闭文件 open(name,mode,encoding) name:文件名字符串 “D:/haha.txt” mode: 只读、写入、追加 r:以只读方式打开 w: 只用于写 a :用于追加 encoding:编码方式 # -*- coding: utf…

FPGA实例源代码集锦:27个实战项目

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;FPGA是一种可编程逻辑器件&#xff0c;允许用户根据需求配置硬件功能。本压缩包提供27个不同的FPGA应用实例源代码&#xff0c;旨在帮助初学者深入学习FPGA设计&#xff0c;并为专业工程师提供灵感。内容涵盖了…

基于 Vue+Mapbox 的智慧矿山可视化功能的技术拆解

01、项目背景 在全球矿业加速向 “高端化、智能化、绿色化” 转型的浪潮下&#xff0c;传统矿业面临的深地开采难题、效率瓶颈与安全隐患日益凸显。 在矿业转型的迫切需求与政策、技术支撑的背景下依托 GIS 技术&#xff0c;开展了 “中国智矿” GIS 开发项目&#xff0c;旨在…

进程状态(Linux)

进程状态Linux进程状态Linux进程状态进程描述R运行状态S睡眠状态D磁盘休眠状态T停止状态t被追踪状态(调试状态)X死亡状态Z僵死状态其实大致也就可以分为三种运行&#xff0c;阻塞&#xff0c;挂起。运行状态每个cpu里都有一个运行队列&#xff0c;进程在运行队列里&#xff0c;…

物联网领域中PHP框架的最佳选择有哪些?

物联网&#xff08;IoT&#xff09;作为近年来快速发展的技术领域&#xff0c;已经渗透到智能家居、工业自动化、智慧城市等方方面面。作为Web开发中广泛使用的语言&#xff0c;PHP凭借其易学易用、开发效率高和生态丰富的特点&#xff0c;也在物联网领域找到了用武之地。 本文…

java反射(详细教程)

我们平常创建类的实例并调用类中成员需要建立在一个前提下&#xff0c;就是已经知道类名和类中成员的信息&#xff0c;灵活性大大降低。甚至在一些项目中还需要修改源码来满足使用条件&#xff0c;大大降低了操作的灵活性。Java 反射&#xff08;Reflection&#xff09;是 Java…

消息队列-初识kafka

优缺点 消息队列的优点&#xff1a; 实现系统解耦&#xff1a; :::color5 系统解耦解释 有 MQ 时是 “服务 A 发消息到队列&#xff0c;其他服务从队列拿消息&#xff0c;新增服务接队列就行”&#xff1b;无 MQ 时是 “服务 A 直接调其他服务的接口 / 依赖&#xff0c;新增 / …

实践《数字图像处理》之Canny边缘检测、霍夫变换与主动二值化处理在短线段清除应用中的实践

在最近的图像处理项目中&#xff0c;其中一个环节&#xff1a;图片中大量短线&#xff08;不是噪声&#xff09;&#xff0c;需要在下一步处理前进行清除。在确定具体实现时&#xff0c;碰到了Canny边缘检测、霍夫变换与主动二值化处理的辩证使用&#xff0c;相关逻辑从图片灰度…

vue3与ue5通信-工具类

工具 ue5-simple.js /*** UE5 通信工具* 两个核心方法&#xff1a;发送消息和接收消息*/// 确保全局对象存在 if (typeof window ! undefined) {window.ue window.ue || {};window.ue.interface window.ue.interface || {}; }/*** 生成 UUID*/ function generateUUID() {retu…

在kotlin中如何使用像java中的static

在 Kotlin 中&#xff0c;没有直接的 static 关键字&#xff0c;但有几种等效的方式来实现 Java 中静态成员的功能&#xff1a; 1. 伴生对象 (Companion Object) - 最常用 class MyClass {companion object {// 静态常量const val STATIC_CONSTANT "constant value"…

如何在 Spring Boot 中指定不同的配置文件?

介绍 Spring Boot 提供了多种方式来管理和加载配置文件&#xff0c;特别是在多环境配置下&#xff0c;比如开发、测试和生产环境。通过指定不同的配置文件&#xff0c;可以灵活地调整应用程序的行为&#xff0c;以适应不同的需求。本文将介绍在 Spring Boot 中如何指定使用不同…

在centOS源码编译方式安装MySQL5.7

一、前言 在生产环境中部署数据库时&#xff0c;很多人会选择直接使用 yum/apt 包管理器 安装 MySQL&#xff0c;这样简单快速&#xff0c;但缺点是版本受限&#xff0c;灵活性不足。对于需要指定版本、启用特定编译参数或优化的场景&#xff0c;源码编译安装 MySQL 就显得非常…

探讨Hyperband 等主要机器学习调优方法的机制和权衡

本篇文章Master Hyperband — An Efficient Hyperparameter Tuning Method in Machine Learning深入探讨了Hyperband这一高效的超参数调优方法。文章的技术亮点在于其结合了多臂老虎机策略和逐次减半算法&#xff0c;能够在大搜索空间中快速剔除表现不佳的配置&#xff0c;从而…