作者:小凯
沉淀、分享、成长,让自己和他人都能有所收获!

本文的宗旨在于通过简单干净实践的方式教会读者,配置出一套 Canal 工具服务,来同步分库分表的数据到 Elasticsearch 文件夹系统中。同时在 SpringBoot 工程中,配置出两套数据源,一套是 MySQL + MyBatis,一套是 Elasticsearch + MyBatis。【这是非常重要的设计手段】

虽然现在有 TiDB 这样的分布式数据库,但对于分库分表 + 数据同步ES,依然是非常主流的方案。同时也有一部分是把分库分表的数据同步到 TiDB 使用。

Github:https://github.com/alibaba/canal(opens new window)

#一、组件介绍
在这里插入图片描述

canal ,译为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

它的工作原理是,canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。在 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 这样 canal 再解析 binary log (binlog)进行配置分发,同步到 Elasticsearch 等系统中进行使用。

那么有了 canal 就可以把分库分表的数据同步到 Elasticsearch,提供汇总查询和聚合操作,也就不需要把轮训每个分库分表数据了。

二、测试预期
本文的案例会把MySQL,2库4表的数据,通过 Sharding 分库分表写入数据后,同步到 Elasticsearch。分库分表如下(环境安装中会自动安装数据库和设置库表);
在这里插入图片描述

三、环境安装
为了让读者伙伴更加简单的学习到这一项方案技能,这里把所需的环境都配置成一整套的 docker compose 脚本文件(ARM、AMD),你只要执行安装即可。安全前注意,无论是本机还是云服务器都需要安装 docker-ce(opens new window)

1. 环境脚本
在这里插入图片描述

  • 打开 xfg-dev-tech-canal 工程,下面就是 docker compose 的执行脚本。
  • mac/windows 如果安装了 docker 可以直接点击如图的三角号安装。如果是在 Linux 安装了 docker 可以把 dev-ops 整个文件夹都上传到云服务器,之后通过脚本;docker-compose -f xfg-dev-tech-canal-docker-compose.yml up -d 进行安装。

1.1 开启 binlog
mysql 数据同步需要创建一个 canal 的账户,之后还需要开启 binlog 日志
在这里插入图片描述

  • 在 mysql 配置文件夹中,设置了初始化授权的账户、导入的库表,以及开启 mysql-bin 和配置要采集的库。
  • 如果你有配置自己其他的库要同步也可以如此配置。

1.2 库表采集配置
在这里插入图片描述

  • 本文选择的是 es 同步方式,所以需要在 canal-adapter 中 es7 文件夹添加同步的库表 yml 配置。
  • 以及在 application.yml 中配置出需要链接的库表以及同步的目标地址,也就是 es 的地址。【因为本文的案例是在同一个 docker compose 下安装,所以直接用名称 elsticsearch 即可访问】

2. 运行状态
在这里插入图片描述

  • 安装完成后可以进入 portainer 查看各个组件的运行,如果有哪个运行失败了,可以点击那个小文件的图标,它可以查看日志。

3. 创建索引
在 doc/dev-ops/curl 下提供了创建 Elasticsearch 的脚本;你可以点击执行或者直接复制执行,也可以复制导入到 ApiPost 里执行。

以上这些脚本是为了创建出数据库表同步到 Elasticsearch 后对应的索引和映射的字段。文章下面会用到。

3.1 创建

curl -X PUT "127.0.0.1:9200/xfg_dev_tech.user_order" -H 'Content-Type: application/json' -d'
{"mappings": {"properties": {"_user_id":{"type": "text"},"_user_name":{"type": "text"},"_order_id":{"type": "text"},"_uuid":{"type": "text"},"_create_time":{"type": "date"},"_update_time":{"type": "date"}}}
}'

3.2 添加

curl -X PUT "127.0.0.1:9200/xfg_dev_tech.user_order/_mapping" -H 'Content-Type: application/json' -d'
{"properties": {"_sku_name": {"type": "text"}}
}'

3.3 删除

curl -X DELETE "127.0.0.1:9200/xfg_dev_tech.user_order"

4. 创建索引(Kibana)
4.1 索引管理
地址:http://127.0.0.1:5601/app/management/kibana/indexPatterns(opens new window)
在这里插入图片描述

  • 填写完,点击创建索引模式即可。

4.2 数据页面
地址:http://127.0.0.1:5601/app/discover(opens new window)
在这里插入图片描述

  • 等后面同步数据了以后,直接在这里点刷新就可以看到了。

5. 许可证
kibana 提供了免费30天的试用许可,安装后可以使用 x-pack-sql-jdbc。它的好处是可以让我们通过 MyBatis 的方式查询 Elasticsearch 数据。

地址:http://127.0.0.1:5601/app/management/stack/license_management(opens new window)
在这里插入图片描述

Elasticsearch 提供了 x-pack-sql-jdbc,让对 Elasticsearch 的查询也可以像使用 MySQL 数据库一样通过 MyBatis 进行查询。但这个 x-pack-sql-jdbc 是付费的,免费可以使用 30 天。之后你可以选择使用重新安装,破解,或者使用 Elasticsearch 的查询方式。还可以自己开发一个 Elasticsearch JDBC,GitHub 上也有类似的组件。

使用时需要引入 POM 配置;

<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/x-pack-sql-jdbc -->
<dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>x-pack-sql-jdbc</artifactId><version>7.17.14</version>
</dependency>

三、工程配置
本节涉及到了简明教程中所讲解的 Sharding 分库分表 (opens new window)的使用,因为我们需要把分库分表的数据通过 canal 同步到 Elasticsearch。(也可以使用其他分库分表组件)

在工程中配置一套 Sharding 分库分表映射的 MyBatis MyBatis,在配置一套 Elasticsearch x-pack-sql-jdbc 数据源映射的 MyBatis Mapper。这样可以读写分别走自己设定好的 Mapper 对象了。
在这里插入图片描述

1. 创建数据源

@Configuration
public class DataSourceConfig {@Configuration@MapperScan(basePackages = "cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch", sqlSessionFactoryRef = "elasticsearchSqlSessionFactory")static class ElasticsearchMyBatisConfig {@Bean("elasticsearchDataSource")@ConfigurationProperties(prefix = "spring.elasticsearch.datasource")public DataSource igniteDataSource(Environment environment) {return new EsDataSource();}@Bean("elasticsearchSqlSessionFactory")public SqlSessionFactory elasticsearchSqlSessionFactory(DataSource elasticsearchDataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(elasticsearchDataSource);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));return factoryBean.getObject();}}@Configuration@MapperScan(basePackages = "cn.bugstack.xfg.dev.tech.infrastructure.dao.mysql", sqlSessionFactoryRef = "mysqlSqlSessionFactory")static class MysqlMyBatisConfig {@Bean("mysqlDataSource")@ConfigurationProperties(prefix = "spring.mysql.datasource")public DataSource mysqlDataSource(Environment environment) {return DataSourceBuilder.create().url(environment.getProperty("spring.mysql.datasource.url")).driverClassName(environment.getProperty("spring.mysql.datasource.driver-class-name")).build();}@Bean("mysqlSqlSessionFactory")public SqlSessionFactory mysqlSqlSessionFactory(DataSource mysqlDataSource) throws Exception {SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();factoryBean.setDataSource(mysqlDataSource);factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));return factoryBean.getObject();}}}
  • ElasticsearchMyBatisConfig 使用 EsDataSource 创建数据源和映射 MyBatis Mapper 文件。
  • MysqlMyBatisConfig 使用 DataSourceBuilder 创建 Sharding 提供的数据源和映射 MyBatis Mapper 文件。

2. Mapper 映射

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.bugstack.xfg.dev.tech.infrastructure.dao.elasticsearch.IElasticsearchUserOrderDao"><resultMap id="dataMap" type="cn.bugstack.xfg.dev.tech.infrastructure.po.UserOrderPO"><result column="_user_id" property="userId"/><result column="_user_name" property="userName"/><result column="_order_id" property="orderId"/><result column="_sku_name" property="skuName"/><result column="_update_time" property="updateTime"/><result column="_create_time" property="createTime"/></resultMap><select id="selectByUserId" resultMap="dataMap">select _user_id, _user_name, _order_id, _sku_namefrom "xfg_dev_tech.user_order"order by _update_timelimit 10</select></mapper>
  • 这个是 Elasticsearch 映射的 Mapper 文件,映射的字段就是前面安装环境的时候设置的索引和字段。现在你使用 Elasticsearch 就不用在工程中硬编码查询语句了,变得非常方便。

四、工程测试
1. 写入数据

@Test
public void test_insert() throws InterruptedException {for (int i = 0; i < 3; i++) {UserOrderPO userOrderPO = UserOrderPO.builder().userName("小哥哥").userId("xfg_" + RandomStringUtils.randomAlphabetic(6)).userMobile("+86 13521408***").sku("13811216").skuName("《手写MyBatis:渐进式源码实践》").orderId(RandomStringUtils.randomNumeric(11)).quantity(1).unitPrice(BigDecimal.valueOf(128)).discountAmount(BigDecimal.valueOf(50)).tax(BigDecimal.ZERO).totalAmount(BigDecimal.valueOf(78)).orderDate(new Date()).orderStatus(0).isDelete(0).uuid(UUID.randomUUID().toString().replace("-", "")).ipv4("127.0.0.1").ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes()).extData("{\"device\": {\"machine\": \"IPhone 14 Pro\", \"location\": \"shanghai\"}}").build();userOrderDao.insert(userOrderPO);Thread.sleep(100);}
}
  • 循环插入3条数据,按需你可以设置更多条数据。
  • 这里的用户编号 userId 是随机的,也是切分键的 ID,所以会在不同的库表写入数据。

2. 数据验证
MySQL:http://127.0.0.1:8899/ (opens new window)docker compose 配置的管理后台,可以 root/123456 登录
Kibana:http://127.0.0.1:5601/app/discover (opens new window)查询写入的数据。
在这里插入图片描述

3. 查询数据

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserOrderDaoTest {@Resourceprivate IElasticsearchUserOrderDao userOrderDao;@Testpublic void test() {List<UserOrderPO> userOrderPOS = userOrderDao.selectByUserId();log.info("测试结果:{}", JSON.toJSONString(userOrderPOS));}}

在这里插入图片描述

  • 通过 Elasticsearch 走 x-pack-sql-jdbc 的方式再把数据查询出来

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/914787.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/914787.shtml
英文地址,请注明出处:http://en.pswp.cn/news/914787.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

气候为何愈演愈“炙” — 未来五年高温趋势与 AI 气象大模型的突破性价值

早、更准 代表性模型 主要特征 应用进展 GraphCast(DeepMind) 10 天全球预报;0.25 分辨率;< 1 min 推理 90 % 指标超 ECMWF HRES,已用于极端风暴提前锁定Google DeepMind MetNet-3(Google Research) 1–4 km 分辨率;2 min 时序;24 h 区域精细预报 美东、欧洲已在 G…

LVS四种模式及部署NAT、DR模式集群

1、lvs简介LVS:Linux Virtual Server&#xff0c;负载调度器&#xff0c;内核集成&#xff0c;章文嵩&#xff0c;阿里四层SLB(ServerLoadBalance)是基于LVSkeepalived实现LVS 官网: http://www.linuxvirtualserver.org/LVS 相关术语VS: Virtual Server&#xff0c;负责调度RS:…

【Linux】Ubuntu22.04安装zabbix

官方文档&#xff1a;zabbix安装文档 环境如下 环境版本nginx1.26.3zabbix7.0.16mysql8.0.41 安装nginx和mysql 一键部署脚本 部署zabbix #!/bin/bash wget https://repo.zabbix.com/zabbix/7.0/ubuntu/pool/main/z/zabbix-release/zabbix-release_latest_7.0ubuntu22.04_…

C++ - 仿 RabbitMQ 实现消息队列--sqlite与gtest快速上手

目录 SQLite 什么是 SQLite 为什么要用 SQLite SQLite3 C/C API 介绍 SQLite3 C/C API 使用 GTest GTest 是什么 GTest 使用 TEST 宏 断言 事件机制 全局事件 TestSuite 事件 SQLite 什么是 SQLite SQLite 是一个进程内的轻量级数据库&#xff0c;它实现了自给自足…

Web3.0 学习方案

Web3.0 学习方案 一、学习方案 &#xff08;一&#xff09;入门阶段 1. 了解 Web3.0 基础概念 学习内容&#xff1a; Web3.0 的起源、愿景、与 Web2.0 的区别区块链的基本概念&#xff1a;分布式账本、哈希、公钥/私钥、共识机制&#xff08;PoW、PoS、DPoS、PBFT 等&#xff0…

springboot3.5.3依赖学习

springboot3.5.3依赖学习 ​ Spring Boot BOM&#xff08;spring-boot-dependencies&#xff09;是 Spring 官方维护的超级依赖清单&#xff0c;覆盖了 Spring 生态中几乎所有核心库、常用工具库及第三方依赖。其作用是统一管理这些依赖的版本&#xff0c;确保它们相互兼容。以…

制作一款打飞机游戏80:道具碰撞

目前我们仍然无法拾取这些物品&#xff0c;它们只是简单地掉落在地上。因此&#xff0c;我们需要对这些功能进行增强。目标‌弹射物品‌&#xff1a;当物品生成时&#xff0c;我们希望它们能以一定的力量弹出&#xff0c;而不是无力地掉落。‌添加不同类型的物品‌&#xff1a;…

Python编程基础(六)| 用户输入和while循环

引言 很久没有写 Python 了&#xff0c;有一点生疏。这是学习《Python 编程&#xff1a;从入门到实践&#xff08;第3版&#xff09;》的课后练习记录&#xff0c;主要目的是快速回顾基础知识。 练习1&#xff1a;汽车租赁 编写一个程序&#xff0c;询问用户要租什么样的汽车&a…

【华为机试】HJ52 计算字符串的编辑距离

文章目录HJ52 计算字符串的编辑距离描述输入描述输出描述示例1HJ52 计算字符串的编辑距离描述输入描述输出描述示例1解题思路算法分析动态规划状态转移状态转移方程算法流程图DP表格示例三种操作详解代码实现思路时间复杂度分析关键优化技巧实际应用场景算法扩展面试考点完整题…

15.手动实现BatchNorm(BN)

15.1 BatchNorm操作手动实现 import torch from torch import nndef batch_norm(X,gamma,beta,moving_mean,moving_var,eps,momentum):if not torch.is_grad_enabled():#这个是推理模式X_hat(X-moving_mean)/torch.sqrt(moving_vareps)else:assert len(X.shape) in (2,4)if le…

【项目实践】SMBMS(Javaweb版)汇总版

文章目录前期准备工作数据库、数据表创建web项目创建项目文件目录配置Tomcat&#xff0c;导入依赖建立实体类编写基础公共方法类导入基础资源登录功能登录页面持久层dao层的用户登录及接口实现dao层接口实现所需的方法业务层sevice层的接口的实现接口实现相关的业务逻辑编写ser…

隐藏源IP的核心方案与高防实践

一、源IP暴露的风险 直接DDoS攻击&#xff1a;2025年Q2全球DDoS攻击峰值达3.8Tbps&#xff08;来源&#xff1a;Cloudflare报告&#xff09;漏洞利用&#xff1a;暴露的SSH端口平均每天遭受12,000暴力破解尝试数据泄露&#xff1a;直接连接数据库风险提升300% 二、4种有效隐藏方…

深度学习图像分类数据集—五种电器识别分类

该数据集为图像分类数据集&#xff0c;适用于ResNet、VGG等卷积神经网络&#xff0c;SENet、CBAM等注意力机制相关算法&#xff0c;Vision Transformer等Transformer相关算法。 数据集信息介绍&#xff1a;五种电器识别分类&#xff1a;[notebook, phone, powerbank, tablet, w…

Windows11家庭版配置frigate 嵌入自研算法(基于Yolov8)-【2】

使用 YOLOv8 的 results.xyxy 结构&#xff0c;下面是一个完整的 MQTT 推送脚本&#xff0c;用于把识别到的目标&#xff08;比如突涌水、水渍、障碍物等&#xff09;发送到 Frigate 的 MQTT 接口。✅ 前提假设 YOLOv8 推理代码已经运行并生成 results.xyxy。每一行是 [x1, y1,…

安装llama-factory报错 error: subprocess-exited-with-error

报错信息如下 Using cached https://mirrors.aliyun.com/pypi/packages/17/89/940a509ee7e9449f0c877fa984b37b7cc485546035cc67bbc353f2ac20f3/av-15.0.0.tar.gz (3.8 MB)Preparing metadata (pyproject.toml) ... errorerror: subprocess-exited-with-error Preparing metad…

QT 多线程 管理串口

记录一下自己使用多线程进行串口管理和数据读取的过程。如果有问题的话可以发消息给我。背景在使用QT制作一个串口数据读取处理的小软件的时候&#xff0c;发现了存在界面卡顿的情况&#xff0c;感觉性能太低&#xff0c;于是考虑把串口数据的读取和处理都放到子线程的缓冲区中…

在虚拟环境中复现论文(环境配置)

前提&#xff1a;已经下载condawinR&#xff0c;输入cmd进入命令行conda create -n PPT python3.8.3 pytorch1.7.0conda create -n PPT(虚拟环境名) python3.8.3(包名) pytorch1.7.0(包名)安装完毕&#xff0c;激活虚拟环境&#xff1a;conda activate PPT根据论文readme要求安…

Flutter Web 的发展历程:Dart、Flutter 与 WasmGC

Flutter Web 应该是 Flutter 开发者里最不“受宠”的平台了&#xff0c;但是其实 Flutter 和 Dart 团队对于 Web 的投入一直没有减少&#xff0c;这也和 Flutter 还有 Dart 的"出生"有关系&#xff0c;今天就借着 Dart 团队的 mer Ağacan 和 Martin Kustermann 在油…

c#方法关键字,ref、out、int

在 C# 中&#xff0c;ref、out 和 in 是用于方法参数传递的关键字&#xff0c;它们控制参数如何在方法和调用者之间传递数据。以下是对这三个关键字的详细分析&#xff1a;1. ref 关键字&#xff08;引用传递&#xff09;作用允许方法修改调用者的变量&#xff1a;通过引用传递…

设计模式—初识设计模式

1.设计模式经典面试题分析几个常见的设计模式对应的面试题。1.1原型设计模式1.使用UML类图画出原型模式核心角色&#xff08;意思就是使用会考察使用UML画出设计模式中关键角色和关系图等&#xff09;2.原型设计模式的深拷贝和浅拷贝是什么&#xff0c;写出深拷贝的两种方式的源…