Docker安装CDC
- 拉取镜像
- 离线形式安装
- 上传文件并创建docker-compose.yml
- 把镜像加载到docker中
- 启动容器
- 连接数据库
- 创建账号,并给账号授权
- 设置wal_level
- 确认wal_level的值
- 创建链接
- 查询连接状态
- 使用kafdrop
- 消息中看不到修改之前的信息怎么办
- 补充
拉取镜像
docker pull confluentinc/cp-zookeeper:7.5.0
docker pull confluentinc/cp-kafka:7.5.0
docker pull debezium/connect:2.7.0.Final
docker pull obsidiandynamics/kafdrop:latest
拉取镜像需要VPN,如果没有VPN可以从我的网盘中下载
通过网盘分享的文件:kafka以及cdc和kafdrop
链接: https://pan.baidu.com/s/10OU_4cy7mWtaKAijGpakfQ?pwd=asn5 提取码: asn5
离线形式安装
上传文件并创建docker-compose.yml
把下载好的tar包上传到服务器某个目录下,并在这个目录下创建docker-compose.yml文件
如果是linux环境内容如下:
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: ["CMD-SHELL", "nc -z zookeeper 2181 || exit 1"]interval: 5stimeout: 3sretries: 5kafka:image: confluentinc/cp-kafka:7.5.0depends_on:zookeeper:condition: service_healthy # 等 zk 完全就绪ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 # Docker Desktop / WindowsKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1healthcheck:test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 || exit 1"]interval: 10stimeout: 5sretries: 5connect:image: debezium/connect:2.7.0.Finaldepends_on:kafka:condition: service_healthy # 等 kafka 完全就绪ports:- "8083:8083"environment:BOOTSTRAP_SERVERS: kafka:9092GROUP_ID: 1CONFIG_STORAGE_TOPIC: my_connect_configsOFFSET_STORAGE_TOPIC: my_connect_offsetsSTATUS_STORAGE_TOPIC: my_connect_statuseskafdrop:image: obsidiandynamics/kafdropports:- "9000:9000"environment:KAFKA_BROKERCONNECT: kafka:9092 # 用服务名,同一网络自动解析depends_on:- kafka
如果是windows环境内容如下:
version: '3.8'services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000healthcheck:test: ["CMD-SHELL", "nc -z zookeeper 2181 || exit 1"]interval: 5stimeout: 3sretries: 5kafka:image: confluentinc/cp-kafka:7.5.0depends_on:zookeeper:condition: service_healthy # 等 zk 完全就绪ports:- "9092:9092"environment:KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://host.docker.internal:9092 # Docker Desktop / WindowsKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1healthcheck:test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 || exit 1"]interval: 10stimeout: 5sretries: 5connect:image: debezium/connect:2.7.0.Finaldepends_on:kafka:condition: service_healthy # 等 kafka 完全就绪ports:- "8083:8083"environment:BOOTSTRAP_SERVERS: host.docker.internal:9092GROUP_ID: 1CONFIG_STORAGE_TOPIC: my_connect_configsOFFSET_STORAGE_TOPIC: my_connect_offsetsSTATUS_STORAGE_TOPIC: my_connect_statuseskafdrop:image: obsidiandynamics/kafdropports:- "9000:9000"environment:KAFKA_BROKERCONNECT: host.docker.internal:9092 # 用服务名,同一网络自动解析depends_on:- kafka
把镜像加载到docker中
请在命令行中进入tar包所在的文件夹,执行以下命令把镜像加载到docker
docker load < zookeeper.tar
docker load < kafka.tar
docker load < connect.tar
docker load < kafdrop.tar
启动容器
docker compose up -d
# 老版本 docker-compose 命令是:docker-compose up -d
启动完成后,请使用docker ps查看四个容器是否都启动了。如果发现容器没有起来,请【docker logs 容器id】查看报错日志,并把日志放入deepseek中查找原因。
连接数据库
我这里连的postgres 10,其他版本或者其他数据库请deepseek,大同小异
创建账号,并给账号授权
CREATE USER debezium WITH PASSWORD 'dbz_pass' REPLICATION;
ALTER USER debezium SUPERUSER;
设置wal_level
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
确认wal_level的值
- 重启postgres,如果是docker安装的,可以docker restart 容器id
- 执行SHOW wal_level查看结果是不是“logical”。如果不是请deepseek。
创建链接
POST http://connect所在服务器ip:8083/connectors
body:【请仔细查看body内容,替换成实际的信息】
{"name": "pg-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","tasks.max": "1","database.hostname": "数据库ip","database.port": "数据库端口","database.user": "debezium","database.password": "dbz_pass","database.dbname": "数据库名","topic.prefix": "dbserver1","plugin.name": "pgoutput","table.include.list": "public.table_a,public.table_b","snapshot.mode": "initial"}
}
查询连接状态
GET http://connect所在服务器ip:8083/connectors/pg-connector/status
成功的结果:
{"name": "pg-connector","connector": {"state": "RUNNING","worker_id": "172.20.0.5:8083"},"tasks": [{"id": 0,"state": "RUNNING","worker_id": "172.20.0.5:8083"}],"type": "source"
}
如果结果里两个status都是RUNNING,则表示成功,否则拿trace里的内容deepseek
使用kafdrop
· 这里使用的kafka图形化界面是kafdrop,也可根据习惯使用别的工具
· 访问地址 http://安装的服务器ip:9000
· 上面所有步骤都成功后,去数据库指定的表中修改一条数据
· 指定的表指的是:"table.include.list"中指定的表
· 然后刷新kafdrop,可以看到dbserver1开头的topic
消息中看不到修改之前的信息怎么办
在数据库执行以下语句
DO $$
DECLAREr RECORD;
BEGINFOR r INSELECT schemaname, tablenameFROM pg_tablesWHERE schemaname = 'public'AND tablename IN ('tb_A', 'tb_B')LOOPEXECUTE format('ALTER TABLE %I.%I REPLICA IDENTITY FULL;', r.schemaname, r.tablename);END LOOP;
END$$;
补充
connect提供了创建链接接口,查询链接状态接口,删除链接接口,查看链接配置接口和更新链接配置接口,接口文档如下:https://docs.apipost.net/docs/detail/4de10f8eac12000?locale=zh-cn&target_id=1dd669507cb037