下面我将创建一个完整的 Spring Boot 项目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 实现数据同步到 Elasticsearch。

项目概述

这个项目将:

  1. 使用 Flink CDC 连接 MySQL 并读取 binlog
  2. 处理数据变化(插入、更新、删除)
  3. 将数据同步到 Elasticsearch
  4. 提供 REST API 管理同步任务

项目结构

src/main/java/
├── com/example/cdc/
│   ├── config/
│   │   ├── FlinkConfig.java
│   │   └── ElasticsearchConfig.java
│   ├── model/
│   │   └── User.java
│   ├── service/
│   │   ├── SyncService.java
│   │   └── JobManager.java
│   ├── controller/
│   │   └── SyncController.java
│   └── FlinkCdcApplication.java

1. 添加依赖 (pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>FlinkCDC</artifactId><version>0.0.1-SNAPSHOT</version><name>FlinkCDC</name><description>FlinkCDC</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version><flink.version>1.16.0</flink.version><flink-cdc.version>3.0.1</flink-cdc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- Flink Connector Elasticsearch --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- Flink Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Flink CLI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>11</source><target>11</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.example.cdc.FlinkCdcApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>

2. 数据模型 (User.java)

package com.example.cdc.model;import lombok.Data;@Data
public class User {private Long id;private String name;private String email;private Long createdAt;private Long updatedAt;private Boolean deleted;
}

3. Flink 配置 (FlinkConfig.java)

package com.example.cdc.config;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfig {@Beanpublic StreamExecutionEnvironment streamExecutionEnvironment() {return StreamExecutionEnvironment.getExecutionEnvironment();}
}

4. Elasticsearch 配置 (ElasticsearchConfig.java)

package com.example.cdc.config;import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.host:localhost}")private String host;@Value("${elasticsearch.port:9200}")private int port;@Bean(destroyMethod = "close")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}
}

5. 同步服务 (SyncService.java)

package com.example.cdc.service;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class SyncService {@Autowiredprivate StreamExecutionEnvironment env;@Value("${mysql.host:localhost}")private String mysqlHost;@Value("${mysql.port:3306}")private int mysqlPort;@Value("${mysql.username:root}")private String mysqlUsername;@Value("${mysql.password:password}")private String mysqlPassword;@Value("${mysql.database:test}")private String mysqlDatabase;@Value("${mysql.table:users}")private String mysqlTable;@Value("${elasticsearch.host:localhost}")private String esHost;@Value("${elasticsearch.port:9200}")private int esPort;@Value("${elasticsearch.index:users}")private String esIndex;public void startSync() throws Exception {// 创建 MySQL CDC SourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(mysqlHost).port(mysqlPort).databaseList(mysqlDatabase).tableList(mysqlDatabase + "." + mysqlTable).username(mysqlUsername).password(mysqlPassword).deserializer(new JsonDebeziumDeserializationSchema()).build();// 创建数据流DataStream<String> stream = env.fromSource(mySqlSource,org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),"MySQL Source");// 转换和处理数据DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 这里可以添加自定义的数据处理逻辑return value;}});// 配置 Elasticsearch SinkList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost(esHost, esPort, "http"));ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(element, ctx, indexer) -> {IndexRequest request = Requests.indexRequest().index(esIndex).source(element, XContentType.JSON);indexer.add(request);});// 配置批量请求esSinkBuilder.setBulkFlushMaxActions(1);// 将数据发送到 Elasticsearch - 使用 addSink 而不是 sinkToprocessedStream.addSink(esSinkBuilder.build());// 启动任务env.execute("MySQL to Elasticsearch Sync");}
}

6. 任务管理器 (JobManager.java)

package com.example.cdc.service;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class JobManager {@Autowiredprivate StreamExecutionEnvironment env;@Autowiredprivate SyncService syncService;private Thread jobThread;public void startJob() {jobThread = new Thread(() -> {try {syncService.startSync();} catch (Exception e) {e.printStackTrace();}});jobThread.start();}public void stopJob() {if (env != null) {try {env.close();} catch (Exception e) {e.printStackTrace();}}if (jobThread != null && jobThread.isAlive()) {jobThread.interrupt();}}@PreDestroypublic void onDestroy() {stopJob();}
}

7. REST 控制器 (SyncController.java)

package com.example.cdc.controller;import com.example.cdc.service.JobManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/sync")
public class SyncController {@Autowiredprivate JobManager jobManager;@PostMapping("/start")public String startSync() {try {jobManager.startJob();return "Sync job started successfully";} catch (Exception e) {return "Failed to start sync job: " + e.getMessage();}}@PostMapping("/stop")public String stopSync() {try {jobManager.stopJob();return "Sync job stopped successfully";} catch (Exception e) {return "Failed to stop sync job: " + e.getMessage();}}
}

8. 应用主类 (FlinkCdcApplication.java)

package com.example.cdc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}
}

9. 配置文件 (application.yml)

server:port: 8080spring:application:name: mysql-cdc-to-esmysql:host: localhostport: 3306username: rootpassword: your_mysql_passworddatabase: your_databasetable: your_tableelasticsearch:host: localhostport: 9200index: your_es_indexflink:parallelism: 1

使用说明

  1. 确保 MySQL 已开启 binlog:

    SHOW VARIABLES LIKE 'log_bin';
    

    如果未开启,需要在 MySQL 配置文件中添加:

    [mysqld]
    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog_row_image=full
    
  2. 创建具有复制权限的 MySQL 用户:

    CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password';
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%';
    FLUSH PRIVILEGES;
    
  3. 启动应用程序:

    mvn spring-boot:run
    
  4. 通过 REST API 启动同步任务:

    POST http://localhost:8080/api/sync/start
    

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/96455.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/96455.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/96455.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Web网站的运行原理2

请求Web网站的文件-HTTP 可以使用HTTP协议在Web浏览器和Web服务器应用程序之间传输Web网页的文件。 在进行HTTP传输之前&#xff0c;需要先在Web浏览器和Web服务器应用程序之间建立TCP连接。 使用HTTP请求可以要求Web浏览器向Web服务器应用程序传输文件。 传输Web网站的文件-HT…

论文阅读:Do As I Can, Not As I Say: Grounding Language in Robotic Affordances

地址&#xff1a;Do As I Can, Not As I Say: Grounding Language in Robotic Affordances 摘要 大型语言模型&#xff08;LLM&#xff09;能够编码丰富的世界语义知识&#xff0c;这类知识对于机器人执行自然语言表达的高层级、时间扩展指令具有重要价值。然而&#xff0c;语…

Django管理后台结合剪映实现课件视频生成应用

在教学内容的数字化制作中&#xff0c;如何将课件与音频快速转换为视频是一项高频需求。借助管理后台和剪辑工具&#xff0c;可以实现课件内容的下载、转换和草稿生成&#xff0c;大幅减少重复操作。 【AI教育教学考试系统】课件在线剪映视频草稿生成应用这里实现的课件PPT部分…

AI升级社区便民服务:AI办事小程序高效办证+应急系统秒响应,告别跑腿愁住得更安心

朋友&#xff0c;你有没有在社区办过事&#xff1f;想给孩子办入学证明&#xff0c;得先跑居委会开证明&#xff0c;再去街道办事处盖章&#xff0c;来回几趟不说&#xff0c;要是材料没带全&#xff0c;还得重新跑&#xff1b;家里水管爆了&#xff0c;半夜联系物业&#xff0…

el-table-draggable拖拽实现表格内容排序

1、图片2、安装包import ElTableDraggable from "el-table-draggable";3、代码&#xff08;html&#xff09;<el-table-draggable:data"soloTableData"input"dragInputHandlerSolo"><el-table:data"soloTableData"row-key&qu…

Linux设备模型技术路线图

Linux设备模型涉及的技术和知识点 1. 核心架构组件 1.1 Kobject 子系统 kobject(内核对象):Linux设备模型的基础构建块 kset(对象集合):kobject的容器,管理相同类型的对象 ktype(对象类型):定义kobject的行为和属性 引用计数机制:使用kref管理对象生命周期 对象层…

面试问题详解六:元对象系统调用槽函数

Qt 的 元对象系统&#xff08;Meta-Object System&#xff09; 是 Qt 核心机制之一&#xff0c;正是它让 C 语言具备了类似脚本语言&#xff08;如 Python&#xff09;的反射、动态绑定、属性系统等能力。 自定义信号与槽&#xff0c;是 Qt 元对象系统最常见、最实用的体现。&a…

Scala面试题及详细答案100道(1-10)-- 基础语法与数据类型

《前后端面试题》专栏集合了前后端各个知识模块的面试题,包括html,javascript,css,vue,react,java,Openlayers,leaflet,cesium,mapboxGL,threejs,nodejs,mangoDB,SQL,Linux… 。 前后端面试题-专栏总目录 文章目录 一、本文面试题目录 1. 简述Scala与Java的主要…

http请求有哪些?

TTP请求方法常见方法&#xff1a;GET&#xff1a;获取资源&#xff0c;参数通过URL传递&#xff0c;可缓存到浏览器本地。POST&#xff1a;提交数据&#xff0c;参数通过请求体传递&#xff0c;不可缓存&#xff0c;常用于创建资源。PUT&#xff1a;更新资源&#xff0c;参数通…

MAPGIS6.7地质编录

1.编录文件excel位于D:\mapgis67\program\section&#xff0c;文件名称&#xff1a;ZKInfoEdit.xls2生成副本&#xff0c;复制ZKInfoEdit.xls到桌面3开始编写 04回次4开始编写 03编录5开始编写 11采样6开始编写 06标志面7开始编写 10钻孔资料8 最后总结 …

轻松掌握Chrome插件开发全流程

Chrome插件开发概述介绍Chrome插件的基本概念、核心功能和应用场景&#xff0c;包括插件与浏览器扩展的区别、插件的主要组成部分&#xff08;如manifest文件、后台脚本、内容脚本等&#xff09;。开发环境搭建列出开发Chrome插件所需的工具和环境配置&#xff0c;包括Chrome浏…

智能二维码QR\刷IC卡\人脸AI识别梯控系统功能设计需基于模块化架构,整合物联网、生物识别、权限控制等技术,以下是多奥分层次的系统设计框架

一、系统架构设计硬件层主控模块&#xff1a;32位ARM嵌入式处理器&#xff0c;支持CAN/RS485/TCP/IP协议识别终端&#xff1a;支持IC卡(CPU/国密/HID)、二维码扫码器(动态码)、人脸识别(活体检测)电梯控制单元&#xff1a;继电器矩阵控制板&#xff0c;支持20层以上电梯按钮控制…

Kubernetes配置与密钥管理深度指南:ConfigMap与Secret企业级实践

目录 专栏介绍 作者与平台 您将学到什么&#xff1f; 学习特色 Kubernetes配置与密钥管理深度指南&#xff1a;ConfigMap与Secret企业级实践 一、 配置管理&#xff1a;云原生应用的基石 1.1 配置管理的演进与挑战 1.2 ConfigMap与Secret的设计哲学 二、 ConfigMap深度…

知行社黄剑杰:金融跨界,重塑震区救援新章

曾在纽约证券交易所敲响上市钟声的黄剑杰&#xff0c;这位知行社的灵魂人物&#xff0c;此次在西藏震区开启了一场震撼人心的“跨界救援”之旅。他带着在华尔街积累的深厚金融智慧&#xff0c;毅然投身到这场与时间赛跑、与灾难较量的战斗中&#xff0c;为传统救灾模式带来了颠…

API模型与接口弃用指南:历史、替代方案及开发者应对策略

API模型及接口弃用&#xff08;Deprecation&#xff09;全解 概览 在AI与API领域&#xff0c;模型的持续迭代与技术进步推动着平台不断优化服务。与此同时&#xff0c;随着更安全、更强大的新模型推出&#xff0c;旧模型与接口的弃用&#xff08;Deprecation&#xff09;成为…

python3GUI--Joy音乐播放器 在线播放器 播放器 By:PyQt5(附下载地址)

文章目录一&#xff0e;前言二&#xff0e;项目简介三&#xff0e;详细模块介绍1.主界面2.歌单广场3.歌单详情页4.歌手筛选5.歌手详情页6.专辑详情页7.歌曲榜单页8.搜索结果页9.其他1.托盘菜单2.设置四&#xff0e;核心问题回答1.软件UI效果实现2.为什么我做不出来这么漂亮的界…

Spring Boot整合Feign实现RPC调用,并通过Hystrix实现服务降级

feign/openfeign和dubbo是常用的微服务RPC框架&#xff0c;由于feigin内部已经集成ribbon&#xff0c;自带了负载均衡的功能&#xff0c;当有多个同名的服务注册到注册中心时&#xff0c;会根据ribbon默认的负载均衡算法将请求分配到不同的服务。这篇文章就简单介绍一下怎么使用…

Java 性能优化实战(三):并发编程的 4 个优化维度

在多核CPU时代&#xff0c;并发编程是提升Java应用性能的关键手段&#xff0c;但不合理的并发设计反而会导致性能下降、死锁等问题。本文将聚焦并发编程的四个核心优化方向&#xff0c;通过真实案例和代码对比&#xff0c;带你掌握既能提升性能又能保证线程安全的实战技巧。 一…

【秋招笔试】2025.08.19百度秋招机考第一套

📌 点击直达笔试专栏 👉《大厂笔试突围》 💻 春秋招笔试突围在线OJ 👉 笔试突围在线刷题 bishipass.com 题目一:花园路径优化问题 1️⃣:使用栈维护必须保留的观景点,基于三角不等式判断 2️⃣:贪心策略,检查中间点是否为"转折点" 3️⃣:时间复杂度 …

SmartX 用户建云实践|某人寿保险:从开发测试、核心生产到信创转型,按需推进企业云建设

某人寿保险自 2018 年起开始探索基于 SmartX 超融合架构搭建私有云 IaaS 资源池&#xff0c;先后部署了开发测试业务、生产业务和重要生产业务的 Oracle 数据库&#xff08;含 RAC&#xff09;&#xff0c;并探索了基于海光芯片的信创云搭建&#xff0c;最终以基于超融合架构的…