一、OceanBase 数据库核心配置
1. 环境准备与版本要求
  • 版本要求:OceanBase CE 4.0+ 或 OceanBase EE 2.2+
  • 组件依赖:需部署 LogProxy 服务(社区版/企业版部署方式不同)
  • 兼容模式:支持 MySQL 模式(默认)和 Oracle 模式
2. 创建用户与权限配置

在 sys 租户创建管理用户(社区版示例):

-- 连接 sys 租户(默认端口 2881)
mysql -h127.0.0.1 -P2881 -uroot@sys -p-- 创建用户(替换为实际用户名密码)
CREATE USER 'ob_cdc_user' IDENTIFIED BY 'Ob@123456';
GRANT ALL PRIVILEGES ON *.* TO 'ob_cdc_user' WITH GRANT OPTION;
FLUSH PRIVILEGES;

在业务租户创建 CDC 用户:

-- 切换到业务租户(如 test_tenant)
USE test_tenant;-- 创建 CDC 数据读取用户
CREATE USER 'flink_user' IDENTIFIED BY 'Flink@123';
GRANT SELECT ON test_db.* TO 'flink_user';
FLUSH PRIVILEGES;
3. 获取关键配置信息

社区版获取 rootserver-list:

-- 连接业务租户
mysql -h127.0.0.1 -P2881 -uflink_user -p-- 查询 rootserver 列表(格式:ip:rpc_port:sql_port)
SHOW PARAMETERS LIKE 'rootservice_list';
-- 示例输出:rootservice_list | 127.0.0.1:2882:2881

企业版获取 config-url:

SHOW PARAMETERS LIKE 'obconfig_url';
-- 示例输出:obconfig_url | http://127.0.0.1:8080/services?Action=ObRootServiceInfo&...
4. 部署 LogProxy 服务(社区版快速启动)
# 下载 LogProxy 二进制(社区版)
wget https://github.com/oceanbase/oblogproxy/releases/download/v2.2.7/oblogproxy-2.2.7.tar.gz
tar -zxvf oblogproxy-2.2.7.tar.gz# 编辑配置文件 oblogproxy.conf
vi oblogproxy/oblogproxy.conf
# 添加以下配置(根据实际情况修改):
[common]
rootservice_list = "127.0.0.1:2882:2881"
logproxy_port = 2983
working_mode = "memory"# 启动 LogProxy
cd oblogproxy
./oblogproxy -c oblogproxy.conf
二、Flink 环境集成配置
1. 添加Maven依赖
<!-- OceanBase CDC 连接器 -->
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-oceanbase-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency><!-- 企业版需添加OceanBase JDBC驱动 -->
<dependency><groupId>com.oceanbase</groupId><artifactId>oceanbase-client</artifactId><version>2.4.2</version>
</dependency>
2. SQL Client部署
  1. 下载 CDC 连接器 JAR:
    flink-sql-connector-oceanbase-cdc-3.0.1.jar
  2. 企业版需额外下载 OceanBase JDBC 驱动:
    oceanbase-client-2.4.2.jar
  3. 将 JAR 包放入 $FLINK_HOME/lib/ 后重启 Flink 集群。
三、Flink SQL 表定义与参数详解
1. MySQL 模式建表示例(含元数据)
-- 配置checkpoint
SET 'execution.checkpointing.interval' = '5s';-- 创建OceanBase CDC表(MySQL模式)
CREATE TABLE ob_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),-- 元数据列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,db_name STRING METADATA FROM 'database_name' VIRTUAL,table_name STRING METADATA FROM 'table_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','rootserver-list' = '127.0.0.1:2882:2881',  -- 社区版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983','working-mode' = 'memory'
);
2. Oracle 模式建表示例
CREATE TABLE ob_orders_oracle (order_id INT,order_date TIMESTAMP(0),customer_name STRING,-- 元数据列tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL
) WITH ('connector' = 'oceanbase-cdc','scan.startup.mode' = 'initial','username' = 'flink_user@test_tenant#ob_cluster','password' = 'Flink@123','tenant-name' = 'test_tenant','database-name' = 'test_db','table-name' = 'orders','hostname' = '127.0.0.1','port' = '2881','compatible-mode' = 'oracle',       -- 关键:设置Oracle兼容模式'jdbc.driver' = 'com.oceanbase.jdbc.Driver',  -- 企业版JDBC驱动'config-url' = 'http://127.0.0.1:8080/...',  -- 企业版必填'logproxy.host' = '127.0.0.1','logproxy.port' = '2983'
);
3. 核心参数详解
参数名必选默认值类型说明
connectorString固定为oceanbase-cdc
scan.startup.modeString启动模式:initial(快照+日志)、latest-offset(仅最新)、timestamp(指定时间)
tenant-nameString目标租户名称(如test_tenant
logproxy.hostStringLogProxy 服务IP
logproxy.portIntegerLogProxy 服务端口(默认2983)
rootserver-list社区版是String社区版rootserver列表(格式ip:rpc_port:sql_port
config-url企业版是String企业版配置服务URL
compatible-modemysqlString兼容模式:mysql(默认)、oracle
jdbc.driver企业版是com.mysql.jdbc.DriverString企业版JDBC驱动类(com.oceanbase.jdbc.Driver
四、环境验证与测试
1. 准备测试数据(OceanBase MySQL模式)
-- 连接业务租户
mysql -h127.0.0.1 -P2881 -uflink_user -p test_db-- 创建测试表
CREATE TABLE orders (order_id INT PRIMARY KEY,order_date TIMESTAMP,customer_name VARCHAR(100),price DECIMAL(10, 2)
);-- 插入数据
INSERT INTO orders VALUES 
(1, '2023-01-01 10:00:00', 'Alice', 100.50),
(2, '2023-01-02 11:00:00', 'Bob', 200.75);
COMMIT;
2. Flink SQL 验证
-- 查询OceanBase CDC表(首次触发快照)
SELECT * FROM ob_orders;-- 在OceanBase中更新数据
UPDATE orders SET price = 150.00 WHERE order_id = 1;
COMMIT;-- 观察Flink输出:应显示变更记录,op_ts为变更时间
3. DataStream API 验证(社区版)
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.connectors.oceanbase.OceanBaseSource;
import org.apache.flink.cdc.connectors.oceanbase.source.RowDataOceanBaseDeserializationSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarcharType;public class OceanBaseSourceExample {public static void main(String[] args) throws Exception {// 定义表结构RowType physicalType = RowType.of(RowType.Field.of("order_id", BigIntType.INSTANCE),RowType.Field.of("customer_name", VarcharType.of(100)));InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(physicalType);// 配置OceanBase SourceOceanBaseSource<RowData> source = OceanBaseSource.<RowData>builder().rsList("127.0.0.1:2882:2881")  // 社区版rootserver-list.startupMode(StartupMode.INITIAL).username("flink_user@test_tenant#ob_cluster").password("Flink@123").tenantName("test_tenant").databaseName("test_db").tableName("orders").hostname("127.0.0.1").port(2881).logProxyHost("127.0.0.1").logProxyPort(2983).deserializer(RowDataOceanBaseDeserializationSchema.newBuilder().setPhysicalRowType(physicalType).setResultTypeInfo(typeInfo).build()).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(3000);env.fromSource(source, null, "OceanBase CDC Source").print();env.execute("OceanBase CDC Test");}
}
五、常见问题与解决方案
  1. LogProxy连接失败

    ERROR: Failed to connect to LogProxy at 127.0.0.1:2983
    
    • 解决方案:
      1. 确认LogProxy服务已启动且端口正确(netstat -an | grep 2983
      2. 检查logproxy.hostlogproxy.port配置是否与LogProxy一致
  2. 权限不足(社区版)

    ERROR: Access denied for user 'flink_user'@'127.0.0.1'
    
    • 解决方案:
      • 确认用户在业务租户有SELECT权限
      • 检查用户名格式是否正确(user@tenant#cluster
  3. 企业版Oracle模式配置错误

    ERROR: incompatible-mode must be set for Oracle mode
    
    • 解决方案:
      • 显式设置compatible-mode = 'oracle'
      • 确保已添加oceanbase-client依赖并部署JDBC驱动
  4. 时间戳转换异常

    • 解决方案:显式设置时区:
      'server-time-zone' = 'Asia/Shanghai'
      
六、生产环境优化建议
  1. LogProxy性能调优

    • 设置working-mode = 'memory'(内存模式,适合高频变更)
    • 调整obcdc.properties.batch_size(如1024)优化批量处理
  2. 高可用配置

    • 部署多节点LogProxy,Flink配置多个logproxy.host(逗号分隔)
    • 企业版使用config-url自动发现OB集群节点
  3. 监控与清理

    • 定期清理LogProxy内存数据(working-mode = 'memory'时):
      # 重启LogProxy或通过API清理
      

通过以上步骤,可完成Flink OceanBase CDC的全流程配置与验证。生产环境中需特别注意社区版与企业版的配置差异、LogProxy服务稳定性及兼容模式的正确设置,以确保数据一致性和系统稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/913329.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/913329.shtml
英文地址,请注明出处:http://en.pswp.cn/news/913329.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

c++对象池

【设计模式】其它经典模式-对象池模式&#xff08;Object Pool Pattern&#xff09;-CSDN博客 在C中&#xff0c;对象池&#xff08;Object Pool&#xff09;是一种管理对象生命周期的技术&#xff0c;旨在减少对象创建和销毁的开销&#xff0c;提高性能。对象池预先分配一定数…

JavaFX:Scene(场景)

简介 Scene对象是JavaFX场景图的根(root)。JavaFX 场景中包含所有可视的 JavaFX GUI 组件。JavaFX 场景由javafx.scene.Scene类表示。必须在 Stage(舞台)上设置 Scene 对象才能使其可见。在本 JavaFX Scene 教程中,将向您展示如何创建 Scene 对象并向其添加 GUI 组件。 创…

vue3.4中的v-model的用法~

1.首先以前我们针对父子组件传参是不是通过defineProps与defineEmits来实现的&#xff0c;但是这么比较繁琐&#xff0c;因为他是单向传参&#xff0c;而不是双向的&#xff0c;这里我们要介绍的是vue3.4的v-model来实现双向数据传递。 2、代码示例&#xff1a; //父组件 <…

nvm常用指令汇总

nvm是用来管理nodejs的&#xff0c;可以方便安装、切换、卸载当前环境的node版本。 以下是常用指令汇总&#xff1a;nvm list 查看本机已经安装的node版本。*表示当前系统正在使用的node版本nvm install xx.xx.x 后边加版本号&#xff0c;表示安装指定的版本nvm use xx.xx.x当前…

洛谷P5021 [NOIP 2018 提高组] 赛道修建【题解】【二分答案+树上贪心】

P5021 [NOIP 2018 提高组] 赛道修建 题意简述 给定一棵含 n n n 个点的无向带权树&#xff0c;求将其分裂为 m m m 条链后&#xff0c;最短的一条链的最大长度是多少&#xff1f; 点可以重复使用&#xff0c;边不可以重复使用。 思路 二分答案贪心判定貌似可以&#xff…

Portal认证过程杂谈

Portal认证模型简介 Portal认证模型通常由这四个设备组成 认证服务器即3A服务器&#xff0c;通常用radius服务器 接入设备通常就是NAC设备&#xff08;网络接入控制&#xff09; Portal服务器就是Portal认证的认证网站&#xff08;通常叫门户网站&#xff09; 认证过程简述…

ZSGuardian ---AI赋能,新一代研发管理守护平台 -即将上线

一场研发管理的革命 在数字化浪潮奔涌向前的今天&#xff0c;软件开发与产品研发的节奏不断加快&#xff0c;市场需求瞬息万变&#xff0c;技术迭代日新月异。对于研发团队而言&#xff0c;如何在复杂多变的环境中&#xff0c;高效地管理项目、保障产品质量、确保按时上线&…

小菜狗的云计算之旅,学习了解rsync+sersync实现数据实时同步(详细操作步骤)

Rsyncsersync实现数据实时同步 目录 Rsyncsersync实现数据实时同步 一、rsync概述 二、rsync运行原理 三、rsync部署 四、备份测试 五、使用非系统用户备份数据 5.1 rsync的配置文件介绍 5.2 配置备份目录 5.3 使用rsync用户备份测试 5.4 pull拉取数据 六、rsyncse…

牛客周赛Round 99(Go语言)

A题 (A.go) 思路总结: 这道题要求判断一个整数中是否包含连续的两个9。 核心思路是将输入的整数转换为字符串&#xff0c;然后遍历这个字符串&#xff0c;检查是否存在相邻的两个字符都是9。如果找到了&#xff0c;就立即停止遍历并输出"YES"&#xff1b;如果遍历完…

红外图像小目标检测热力图可视化系统

原创代码&#xff0c;可以工程修改含界面。

供应链管理:指标评估方式分类与详解

一、指标评估方式分类与详解 评估维度评估方式核心方法适用场景示例数据来源内部数据评估从企业ERP、MES、CRM等系统提取生产、财务、客户等数据。成本、效率、质量等内部管理指标评估。生产成本数据&#xff08;MES系统&#xff09;、客户满意度&#xff08;CRM系统&#xff…

基于 Rust 的前端工具基本实现

1. Rust 环境安装 1.1. 安装 Rust Rust 提供了一个非常方便的安装工具 rustup,可以通过以下命令安装 Rust: curl --proto =https --tlsv1.2 -sSf https://sh.rustup.rs | sh 这个命令会安装 Rust 编译器 rustc、包管理工具 cargo 以及其他相关工具。 1.2. 配置环境变量 …

大模型关键字解释

&#x1f4a1; 一、模型结构关键词 1. Transformer Transformer 是一种专门用来“理解文字”的神经网络结构。就像一个聪明的秘书&#xff0c;能同时看懂整段话的所有词之间的关系&#xff0c;而不是像老式模型那样一句一句读。 &#x1f449; 举例&#xff1a;以前的模型像…

空调和烘干机的使用

开关 制冷 选择上下扫风 那个就下来了 烘干机 电源键 长按3s以上直到菜单显示 选择小件 不要快烘 至少1个半小时 才可以烘干

极简的神经网络反向传播例子

我之前一直没搞清楚&#xff0c;神经网络为什么要求导&#xff1f;反向传播又是什么&#xff1f;于是到现在深究回来…… 本质就是拟合一个未知函数。 高中的数理统计就学过最小二乘法这种回归方法&#xff08;ŷ 代表自己的预测y&#xff0c;这个表达要记住&#xff09;&…

01-什么是强化学习

什么是强化学习 1. 定义 强化学习&#xff08;Reinforcement Learning, RL&#xff09;是一种使智能体&#xff08;Agent&#xff09;通过与环境&#xff08;Environment&#xff09;不断交互&#xff0c;学习如何在不同情境下采取行动以获得最大化累积奖励的机器学习方法。 强…

淘宝直播数字人:音视频算法工程技术

本专题是我们打造智能数字人的部分实践总结。我们将探讨六大核心环节&#xff1a;LLM文案生产赋予数字人思考和内容生成能力&#xff0c;如同其“大脑”&#xff1b;LLM互动能力则聚焦对话逻辑与拟人化交流&#xff0c;是实现自然交互的关键&#xff1b;TTS&#xff08;语音合成…

MySQL回表查询深度解析:原理、影响与优化实战

引言 作为后端开发或DBA&#xff0c;你是否遇到过这样的场景&#xff1a; 明明给字段加了索引&#xff0c;查询还是慢&#xff1f;EXPLAIN一看&#xff0c;执行计划里type是ref&#xff0c;但数据量不大却耗时很久&#xff1f; 这时候&#xff0c;你很可能遇到了MySQL中常见的…

任务管理器看不到的内存占用:RAMMap 深度分析指南

前言&#xff1a;任务管理器看不到的内存真相 在日常使用 Windows 系统时&#xff0c;我们有时会遇到一种令人费解的情况&#xff1a; 刚刚开机&#xff0c;什么软件都没运行&#xff0c;系统内存却已经占用了 7&#xff5e;8 GB。 打开任务管理器一看&#xff0c;前几个进程加…

从传统仓库到智能物流枢纽:艾立泰的自动化蜕变之旅

在物流行业智能化浪潮中&#xff0c;艾立泰从依赖人工的传统仓库转型为智能物流枢纽&#xff0c;其自动化升级路径为行业提供了典型范本。​曾几何时&#xff0c;艾立泰仓库内人工搬运、纸质单据流转、手工盘点是常态&#xff0c;效率低下、差错率高、人力成本攀升等问题制约发…