下面我将创建一个完整的 Spring Boot 项目,使用 Flink CDC 3.0 基于 MySQL 的 binlog 实现数据同步到 Elasticsearch。
项目概述
这个项目将:
- 使用 Flink CDC 连接 MySQL 并读取 binlog
- 处理数据变化(插入、更新、删除)
- 将数据同步到 Elasticsearch
- 提供 REST API 管理同步任务
项目结构
src/main/java/
├── com/example/cdc/
│ ├── config/
│ │ ├── FlinkConfig.java
│ │ └── ElasticsearchConfig.java
│ ├── model/
│ │ └── User.java
│ ├── service/
│ │ ├── SyncService.java
│ │ └── JobManager.java
│ ├── controller/
│ │ └── SyncController.java
│ └── FlinkCdcApplication.java
1. 添加依赖 (pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>FlinkCDC</artifactId><version>0.0.1-SNAPSHOT</version><name>FlinkCDC</name><description>FlinkCDC</description><properties><java.version>11</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.7.6</spring-boot.version><flink.version>1.16.0</flink.version><flink-cdc.version>3.0.1</flink-cdc.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Flink CDC --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- Flink Connector Elasticsearch --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>${flink.version}</version></dependency><!-- Flink JSON --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><!-- Flink Java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Flink CLI --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.36</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>11</source><target>11</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.example.cdc.FlinkCdcApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>
2. 数据模型 (User.java)
package com.example.cdc.model;import lombok.Data;@Data
public class User {private Long id;private String name;private String email;private Long createdAt;private Long updatedAt;private Boolean deleted;
}
3. Flink 配置 (FlinkConfig.java)
package com.example.cdc.config;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FlinkConfig {@Beanpublic StreamExecutionEnvironment streamExecutionEnvironment() {return StreamExecutionEnvironment.getExecutionEnvironment();}
}
4. Elasticsearch 配置 (ElasticsearchConfig.java)
package com.example.cdc.config;import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Value("${elasticsearch.host:localhost}")private String host;@Value("${elasticsearch.port:9200}")private int port;@Bean(destroyMethod = "close")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}
}
5. 同步服务 (SyncService.java)
package com.example.cdc.service;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class SyncService {@Autowiredprivate StreamExecutionEnvironment env;@Value("${mysql.host:localhost}")private String mysqlHost;@Value("${mysql.port:3306}")private int mysqlPort;@Value("${mysql.username:root}")private String mysqlUsername;@Value("${mysql.password:password}")private String mysqlPassword;@Value("${mysql.database:test}")private String mysqlDatabase;@Value("${mysql.table:users}")private String mysqlTable;@Value("${elasticsearch.host:localhost}")private String esHost;@Value("${elasticsearch.port:9200}")private int esPort;@Value("${elasticsearch.index:users}")private String esIndex;public void startSync() throws Exception {// 创建 MySQL CDC SourceMySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname(mysqlHost).port(mysqlPort).databaseList(mysqlDatabase).tableList(mysqlDatabase + "." + mysqlTable).username(mysqlUsername).password(mysqlPassword).deserializer(new JsonDebeziumDeserializationSchema()).build();// 创建数据流DataStream<String> stream = env.fromSource(mySqlSource,org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks(),"MySQL Source");// 转换和处理数据DataStream<String> processedStream = stream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {// 这里可以添加自定义的数据处理逻辑return value;}});// 配置 Elasticsearch SinkList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost(esHost, esPort, "http"));ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,(element, ctx, indexer) -> {IndexRequest request = Requests.indexRequest().index(esIndex).source(element, XContentType.JSON);indexer.add(request);});// 配置批量请求esSinkBuilder.setBulkFlushMaxActions(1);// 将数据发送到 Elasticsearch - 使用 addSink 而不是 sinkToprocessedStream.addSink(esSinkBuilder.build());// 启动任务env.execute("MySQL to Elasticsearch Sync");}
}
6. 任务管理器 (JobManager.java)
package com.example.cdc.service;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Component
public class JobManager {@Autowiredprivate StreamExecutionEnvironment env;@Autowiredprivate SyncService syncService;private Thread jobThread;public void startJob() {jobThread = new Thread(() -> {try {syncService.startSync();} catch (Exception e) {e.printStackTrace();}});jobThread.start();}public void stopJob() {if (env != null) {try {env.close();} catch (Exception e) {e.printStackTrace();}}if (jobThread != null && jobThread.isAlive()) {jobThread.interrupt();}}@PreDestroypublic void onDestroy() {stopJob();}
}
7. REST 控制器 (SyncController.java)
package com.example.cdc.controller;import com.example.cdc.service.JobManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/api/sync")
public class SyncController {@Autowiredprivate JobManager jobManager;@PostMapping("/start")public String startSync() {try {jobManager.startJob();return "Sync job started successfully";} catch (Exception e) {return "Failed to start sync job: " + e.getMessage();}}@PostMapping("/stop")public String stopSync() {try {jobManager.stopJob();return "Sync job stopped successfully";} catch (Exception e) {return "Failed to stop sync job: " + e.getMessage();}}
}
8. 应用主类 (FlinkCdcApplication.java)
package com.example.cdc;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class FlinkCdcApplication {public static void main(String[] args) {SpringApplication.run(FlinkCdcApplication.class, args);}
}
9. 配置文件 (application.yml)
server:port: 8080spring:application:name: mysql-cdc-to-esmysql:host: localhostport: 3306username: rootpassword: your_mysql_passworddatabase: your_databasetable: your_tableelasticsearch:host: localhostport: 9200index: your_es_indexflink:parallelism: 1
使用说明
-
确保 MySQL 已开启 binlog:
SHOW VARIABLES LIKE 'log_bin';
如果未开启,需要在 MySQL 配置文件中添加:
[mysqld] server-id=1 log-bin=mysql-bin binlog_format=row binlog_row_image=full
-
创建具有复制权限的 MySQL 用户:
CREATE USER 'flink_user'@'%' IDENTIFIED BY 'password'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink_user'@'%'; FLUSH PRIVILEGES;
-
启动应用程序:
mvn spring-boot:run
-
通过 REST API 启动同步任务:
POST http://localhost:8080/api/sync/start