一、基础概念与原理

1. Canal是什么?

阿里巴巴开源的MySQL binlog增量订阅与消费组件,通过伪装为MySQL Slave监听Master的binlog变更,实现实时数据同步。

Canal 官方网站:https://github.com/alibaba/canal

Canal Demo: https://gitee.com/original-intention/canal-gorgor-demo


2. 工作原理

关键角色:

2.1  canal.deployer(服务端/Server)
  • 核心作用:伪装成 MySQL 的从库(Slave),监听主库的 binlog 变更,解析并转发数据变更事件。

  • 关键功能

    • 连接 MySQL 主库,订阅 binlog 并解析为结构化数据(如 CanalEntry)。

    • 支持将解析后的数据通过 TCPKafkaRocketMQ 等方式投递给下游消费者(如 canal.adapter)。

    • 管理多个同步实例(instance),每个实例对应一个独立的数据同步通道58。

  • 配置文件

    • conf/canal.properties:全局参数(如端口、存储模式)。

    • conf/example/instance.properties:实例级配置(如源数据库地址、账号、表过滤规则)。


2.2 canal.adapter(客户端适配器)
  • 核心作用:消费 canal.deployer 解析的数据,并同步到目标数据源(如 MySQL、Elasticsearch、OceanBase 等)。

  • 关键功能

    • 支持多种目标源适配器(rdbes7hbase 等)。

    • 提供 全量 & 增量同步能力,通过 REST API 触发全量同步(如 curl /etl/rdb/mysql1/user.yml)。

    • 支持多表映射、字段转换、批量提交等配置。

  • 配置文件

    • conf/application.yml:定义数据源、消费模式(TCP/MQ)、目标适配器。

    • conf/rdb/*.yml 或 conf/es7/*.yml:表级同步规则(如源表、目标表、主键映射)。


2.3 canal.admin(管理平台)
  • 核心作用:提供 Web 可视化界面,集中管理 canal.deployer 集群和实例配置。

  • 关键功能

    • 动态管理实例(启动/停止/配置)。

    • 监控同步状态和日志。

    • 支持高可用部署(依赖 ZooKeeper)。

  • 部署要求

    • 需初始化元数据库(执行 canal_manager.sql)。

    • 通过 conf/application.yml 配置数据库连接和权限。


3. 核心应用场景:

  • 业务解耦(如订单状态变更触发消息通知)

  • 实时缓存更新(Redis)

  • 跨数据库/机房数据同步(如MySQL→MySQL、MySQL→Elasticsearch)

  • 数据库镜像

  • 数据库实时备份


二、环境准备与部署

1. MySQL配置

  • 开启binlog

查看配置

show VARIABLES LIKE '%log_bin%';
show VARIABLES LIKE '%binlog_format%';
show VARIABLES LIKE '%server_id%';

修改my.cnf,添加:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW  # 必须为ROW模式
server_id=1        # 与Canal的slaveId不重复
  • 创建Canal账号
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

 2. Canal Server部署

  • 下载与解压:canal.deployer-1.1.4.tar.gz

  • 配置实例conf/canal.properties

进入conf目录,修改canal.properties文件,比较关键的是canal.destinations
 canal.destinations = example 修改成  canal.destinations = course
这里表示,我们需要监控与course课程相关的数据变动,而相关的数据库、表配置会分
别放在course目录下,如果没有这个目录就需要新建这个目录。可以从example目录拷贝一
个过来,并且将名字修改成 course
修改 conf/example/instance.properties 文件
对于instance.properties的修改比较关键的就是几处,
第一 、是MySQL主服务的连接配置
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=mysqlbinlog.000065
canal.instance.master.position=238116155# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

canal.instance.master.journal.name和canal.instance.master.position的值,通过一下命令获取

show master STATUS;

 第二处、是要对哪些相关的业务表进行监视,比如我们这里是course课程信息,数据放在

seckill_order库中:
# table regex
canal.instance.filter.regex=seckill_order.course
配置完成后,进入bin目录,执行startup.bat即可

 三、数据同步实战

1. 引入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.3</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.gorgor.canal</groupId><artifactId>canal-gorgor-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>canal-gorgor-demo</name><description>Demo project for Spring Boot</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><optional>true</optional></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion></exclusions></dependency></dependencies></project>

2. 配置(application.yml)

canal:server:ip: localhostport: 11111course:destination: coursebatchSize: 1000spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/shardingdb1?useSSL=false&serverTimezone=UTCusername: rootpassword: root

3. 配置 CanalConnector 连接

@Configuration
@EnableScheduling
@EnableAsync
public class CanalCourseConfig {@Value("${canal.server.ip}")private String canalServerIp;@Value("${canal.server.port}")private int canalServerPort;@Value("${canal.server.username:blank}")private String userName;@Value("${canal.server.password:blank}")private String password;@Value("${canal.course.destination}")private String destination;@Bean("secKillConnector")public CanalConnector newSingleConnector(){String userNameStr = "blank".equals(userName) ? "" : userName;String passwordStr = "blank".equals(password) ? "" : password;return CanalConnectors.newSingleConnector(new InetSocketAddress(canalServerIp,canalServerPort), destination, userNameStr, passwordStr);}}

4. 数据同步代码

@Service
@Slf4j
public class SecKillData implements IProcessCanalData {private final static String COURSE_ID = "cid";private final static String COURSE_NAME = "cname";private final static String USER_ID = "user_id";private final static String COURSE_STATUS = "cstatus";@Autowired@Qualifier("secKillConnector")private CanalConnector connector;@Value("${canal.seckill.subscribe:server}")private String subscribe;@Value("${canal.course.batchSize}")private int batchSize;@Autowiredprivate JdbcTemplate jdbcTemplate;@PostConstruct@Overridepublic void connect() {connector.connect();if ("server".equals(subscribe))connector.subscribe(null);elseconnector.subscribe(subscribe);connector.rollback();}@PreDestroy@Overridepublic void disConnect() {connector.disconnect();}@Async@Scheduled(initialDelayString = "${canal.course.initialDelay:5000}", fixedDelayString = "${canal.course.fixedDelay:5000}")@Overridepublic void processData() {try {if (!connector.checkValid()) {log.warn("与Canal服务器的连接失效!!!重连,下个周期再检查数据变更");this.connect();return; // 重连后等待下个周期处理}Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {log.info("本次[{}]没有检测到课程数据更新。", batchId);// 空消息也必须确认connector.ack(batchId);return;}log.info("本次[{}]课程数据共有[{}]次更新需要处理", batchId, size);for (CanalEntry.Entry entry : message.getEntries()) {// 跳过事务开始/结束事件if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {try {if (eventType == EventType.DELETE) {processDeleteEvent(rowData);} else if (eventType == EventType.INSERT) {processInsertEvent(rowData);} else if (eventType == EventType.UPDATE) {processUpdateEvent(rowData);}} catch (Exception e) {log.error("处理行数据失败: {}", e.getMessage(), e);}}}connector.ack(batchId); // 批量确认log.info("本次[{}]处理课程Canal同步数据完成", batchId);} catch (Exception e) {log.error("处理课程Canal同步数据失败,请检查:", e);}}/*** 处理删除事件*/private void processDeleteEvent(CanalEntry.RowData rowData) {// 删除事件使用Before列获取数据Map<String, String> beforeColumns = getColumnsMap(rowData.getBeforeColumnsList());Long cid = parseLongSafely(beforeColumns.get(COURSE_ID));if (cid != null) {jdbcTemplate.update("DELETE FROM course WHERE cid = ?", cid);log.info("删除课程活动: cid={}", cid);} else {log.error("删除事件中未找到有效的课程ID");}}/*** 处理插入事件*/private void processInsertEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("INSERT INTO course (cid, cname, user_id, cstatus) VALUES (?, ?, ?, ?)",cid, cname, userId, cstatus);log.info("新增课程活动: cid={}, cname={}", cid, cname);} else {log.error("插入事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 处理更新事件*/private void processUpdateEvent(CanalEntry.RowData rowData) {Map<String, String> afterColumns = getColumnsMap(rowData.getAfterColumnsList());Long cid = parseLongSafely(afterColumns.get(COURSE_ID));String cname = afterColumns.get(COURSE_NAME);Long userId = parseLongSafely(afterColumns.get(USER_ID));String cstatus = afterColumns.get(COURSE_STATUS);if (cid != null && cname != null && userId != null && cstatus != null) {jdbcTemplate.update("UPDATE course SET cname = ?, user_id = ?, cstatus = ? WHERE cid = ?",cname, userId, cstatus, cid);log.info("更新课程活动: cid={}, cname={}", cid, cname);} else {log.error("更新事件中缺失必要字段: cid={}, cname={}, userId={}, cstatus={}",cid, cname, userId, cstatus);}}/*** 将列列表转换为Map (列名 -> 值)*/private Map<String, String> getColumnsMap(List<Column> columns) {return columns.stream().collect(Collectors.toMap(Column::getName,Column::getValue,(existing, replacement) -> existing));}/*** 安全转换Long类型*/private Long parseLongSafely(String value) {try {return value != null && !value.isEmpty() ? Long.parseLong(value) : null;} catch (NumberFormatException e) {log.error("转换Long失败: {}", value);return null;}}
}

具体代码在上面 Canal Demo 案例链接项目中。

初始化sql 在项目 resources/sql 目录下。


四、相关开源&产品

  • canal 消费端开源项目: Otter
  • 阿里巴巴去 Oracle 数据迁移同步工具: yugong
  • 阿里巴巴离线同步开源项目 DataX
  • 阿里巴巴数据库连接池开源项目 Druid
  • 阿里巴巴实时数据同步工具 DTS

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/914717.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/914717.shtml
英文地址,请注明出处:http://en.pswp.cn/news/914717.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

算法第23天|贪心算法:基础理论、分发饼干、摆动序列、最大子序和

今日总结&#xff1a; 摆动序列的三种特殊情况需要着重思考&#xff0c;感觉是没有思考清楚 基础理论 1、贪心的本质&#xff1a; 贪心的本质是选择每一阶段的局部最优&#xff0c;从而达到全局最优。 例如&#xff1a;一堆钞票&#xff0c;只能拿走10张&#xff0c;如何拿走最…

Q-chunking——带有动作分块的强化学习:基于人类演示,进行一定的连贯探索(且可做到无偏的n步价值回溯)

前言 我在之前的文章中提到过多次&#xff0c;长沙具身团队是我司建设的第二支具身团队&#xff0c;通过5月份的全力招聘&#xff0c;为了冲刺6月底和7月初来长沙办公室考察的第一批客户&#xff0c;过去一个多月来&#xff0c;长沙分部(一开始就5人&#xff0c;另外5人 实习…

NW956NW961美光固态闪存NW964NW968

美光固态闪存深度解析&#xff1a;NW956、NW961、NW964与NW968的全方位评测一、产品概述与市场定位在当今数据爆炸的时代&#xff0c;固态硬盘&#xff08;SSD&#xff09;作为存储领域的佼佼者&#xff0c;其性能与稳定性成为了用户关注的焦点。美光&#xff08;Micron&#x…

C++修炼:IO流

Hello大家好&#xff01;很高兴我们又见面啦&#xff01;给生活添点passion&#xff0c;开始今天的编程之路&#xff01; 我的博客&#xff1a;<但凡. 我的专栏&#xff1a;《编程之路》、《数据结构与算法之美》、《C修炼之路》、《Linux修炼&#xff1a;终端之内 洞悉真理…

语音识别的速度革命:从 Whisper 到 Whisper-CTranslate2,我经历了什么?

Whisper-CTranslate2&#xff1a;语音识别的速度革命 大家好&#xff0c;一个沉迷于 AI 语音技术的 “音频猎人”。最近在处理大量播客转录项目时&#xff0c;我被传统语音识别工具折磨得苦不堪言 ——RTX 3090 跑一个小时的音频要整整 20 分钟&#xff0c;服务器内存分分钟爆满…

JVM 内存模型详解:GC 是如何拯救内存世界的?

JVM 内存模型详解&#xff1a;GC 是如何拯救内存世界的&#xff1f; 引言 Java 虚拟机&#xff08;JVM&#xff09;是 Java 程序运行的基础&#xff0c;其核心特性之一就是自动内存管理。与 C/C 不同&#xff0c;Java 开发者无需手动分配和释放内存&#xff0c;而是由 JVM 自动…

分布式全局唯一ID生成:雪花算法 vs Redis Increment,怎么选?

在黑马点评项目实战中&#xff0c;关于全局唯一ID生成的实现方案选择中&#xff0c;我看到有人提到了雪花算法&#xff0c;本文就来简单了解一下雪花算法与Redis的incr方案的不同。在分布式系统开发中&#xff0c;“全局唯一ID”是绕不开的核心问题。无论是分库分表的数据库设计…

(新手友好)MySQL学习笔记(完):事务和锁

事务和锁事务transaction&#xff0c;一组原子性的SQL查询&#xff0c;或者说是一个独立的工作单元。如果能够成功执行这组查询的全部语句&#xff0c;就会执行这组查询&#xff1b;如果其中任何一条语句无法成功执行&#xff0c;那么这组查询的所有语句都不会执行。也就是说&a…

【CMake】使用 CMake 将单模块 C 项目构建为库并链接主程序

目录1. 项目结构设计&#x1f4e6; 结构说明2. 项目文件内容2.1 顶层 CMakeLists.txt2.2 模块 src/color/CMakeLists.txt ✅【推荐写法】❓是否需要写 project()&#xff1f;2.3 模块头文件 include/color.h2.4 模块实现文件 src/color/color.c2.5 主程序 src/main.c3. 构建与运…

从零开始的云计算生活——番外4,使用 Keepalived 实现 MySQL 高可用

目录 前言 一、架构原理​ ​Keepalived 作用​ ​MySQL 主从复制​ 二、环境准备​ 服务器要求​&#xff1a; 安装基础软件​ 三、配置 MySQL 主从复制 四、配置 Keepalived 主节点配置​&#xff08;/etc/keepalived/keepalived.conf&#xff09; 从节点配置 五、…

list类的常用接口实现及迭代器

目录 1. list类的介绍 2.list类的常用接口 2.1 list类的常用构造 2.2 list类对象的容量操作 2.3 list迭代器 2.4 list类的常用操作 3.list的模拟实现 1. list类的介绍 list代表的是双向链表&#xff0c;常见的有创建&#xff0c;增&#xff0c;删&#xff0c;改几个接口…

vscode Cline接入火山引擎的Deepseek R1

创建火山引擎Deepseek R1的API 在火山引擎管理控制台中创建Deepseek R1推理接入点&#xff08;大模型&#xff09;&#xff0c;创建成功后会看到下图效果。在操作中选择API调用&#xff0c;在页面中选择OpenAI SDK&#xff0c;按照步骤找到baseUrl地址和API_KEY&#xff0c;后续…

新手向:自动化图片格式转换工具

大家好&#xff01;今天我要分享一个非常实用的Python小工具——图片格式批量转换器。如果你经常需要处理大量不同格式的图片文件&#xff0c;或者需要统一图片格式以便于管理&#xff0c;那么这个工具将会成为你的得力助手&#xff01;一、为什么需要图片格式转换&#xff1f;…

CUDA中的内存管理、锁页内存、UVA统一虚拟地址、零拷贝、统一内存

文章目录0 前言1 swap内存跟锁页内存2 UVA(Unified Virtual Addressing)统一虚拟地址3 先看最普通的cuda内存分配、释放、传输4 申请锁页内存4.1 cudaHostAllocDefault4.2 cudaHostAllocPortable4.3 cudaHostAllocWriteCombined4.3 cudaHostAllocMapped4.4 几种锁页内存总结4.5…

微服务环境下的灰度发布与金丝雀发布实战经验分享

微服务环境下的灰度发布与金丝雀发布实战经验分享 在大规模微服务架构中&#xff0c;如何平滑安全地上线新功能是每个后端团队的痛点。本文将结合生产环境中的真实案例&#xff0c;分享灰度发布&#xff08;Gray Release&#xff09;与金丝雀发布&#xff08;Canary Release&am…

MEF 在 WPF 中的简单应用

MEF核心笔记MEF 的开发模式主要适用于插件化的业务场景中&#xff0c;C/S 和 B/S 中都有相应的使用场景&#xff0c;其中包括但不限于 ASP.NET MVC 、ASP WebForms、WPF、UWP 等开发框架。当然&#xff0c;DotNet Core 也是支持的。 以下是搜索到一些比较好的博文供参考&#…

Gitlab跑CICD的时候,maven镜像和pom.xml使用的maven版本冲突导致没办法build成功的解决方法

是这样的&#xff01;最近遇到一个非常棘手的难题&#xff0c;我搞了大概2周时间才把他弄出来&#xff0c;因为自己搭了个私服的maven仓库&#xff0c;他不像maven官方仓库一样&#xff0c;可以跟nginx一样转的&#xff0c;所以遇到好几个难点&#xff01;第一点&#xff1a;就…

Linux内核IPv4路由查找:LPC-Trie算法的深度实践

在互联网基础设施的核心领域,路由查找性能直接决定了网络转发效率。Linux内核作为现代网络系统的基石,其IPv4路由子系统采用了一种名为LPC-Trie(Level-Compressed Trie) 的创新数据结构,在net/ipv4/fib_trie.c文件中实现了高效的路由管理方案。本文将深入剖析这一机制的设…

【设计模式】装饰(器)模式 透明装饰模式与半透明装饰模式

装饰模式&#xff08;Decorator Pattern&#xff09;详解一、装饰模式简介 装饰模式&#xff08;Decorator Pattern&#xff09; 是一种 结构型设计模式&#xff0c;它允许你动态地给对象添加行为或职责&#xff0c;而无需修改其源代码&#xff0c;也不需要使用继承来扩展功能。…

NAT原理与实验指南:网络地址转换技术解析与实践

NAT实验 NAT&#xff08;Network Address Translation&#xff0c;网络地址转换&#xff09;&#xff1a; NAT技术的介绍&#xff1a; 随着Internet用户的快速增长&#xff0c;以及地址分配不均等因素&#xff0c;IPv4地址&#xff08;约40亿的空间地址&#xff09;已经陷入不…