设计一个MySQL数据库和达梦数据库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较两条两个数据库中,一个数据库有,另一个数据库没有的记录,以id是否存在作为标准,还需要比较两边都有记录的所有数据字段是否都相同,如果有不相同指出这条数据的id,以及不相同数据所在字段的名称和在两个数据库中的值,分批比较时需要考虑所有可能的情况。
下面是一个详细的MySQL与达梦数据库表数据比较的程序流程设计,支持分批处理、异步预取和差异检测:
程序流程图
详细程序流程
1. 初始化阶段
# 连接数据库
mysql_conn = connect_mysql(user, pass, host, port, db)
dm_conn = connect_dm(user, pass, host, port, db) # 达梦连接# 获取表元数据
def get_metadata(conn):cursor = conn.cursor()cursor.execute(f"DESCRIBE {table_name}")return [col[0] for col in cursor.fetchall()] # 字段列表
2. 核心参数
BATCH_SIZE = 2000 # 每批数据量
last_id = 0 # 当前批次起始ID
current_batch = {'mysql': None, 'dm': None} # 当前批次数据
next_batch = {'mysql': None, 'dm': None} # 预取的下一批数据
diff_records = [] # 差异结果存储
3. 数据获取函数
def fetch_batch(conn, start_id, batch_size):cursor = conn.cursor()# MySQL语法if isinstance(conn, MySQLConnection):cursor.execute(f"SELECT * FROM {table_name} WHERE id > {start_id} ORDER BY id LIMIT {batch_size}")# 达梦语法elif isinstance(conn, DMConnection):cursor.execute(f"SELECT * FROM (SELECT t.*, ROWNUM as rn FROM "f"(SELECT * FROM {table_name} WHERE id > {start_id} ORDER BY id) t "f"WHERE ROWNUM <= {batch_size}) WHERE rn > 0")return {row[0]: row for row in cursor.fetchall()} # {id: 完整数据}
4. 异步数据预取
import threadingdef async_fetch_next():global next_batch, last_idnext_last_id = max(max(current_batch['mysql'].keys()) if current_batch['mysql'] else last_id,max(current_batch['dm'].keys()) if current_batch['dm'] else last_id)next_batch = {'mysql': fetch_batch(mysql_conn, next_last_id, BATCH_SIZE),'dm': fetch_batch(dm_conn, next_last_id, BATCH_SIZE)}
5. 数据比较逻辑
def compare_batches(mysql_data, dm_data):# 1. 检测缺失记录mysql_ids = set(mysql_data.keys())dm_ids = set(dm_data.keys())# MySQL有而DM缺失for mid in mysql_ids - dm_ids:diff_records.append({'id': mid, 'type': 'missing_in_dm'})# DM有而MySQL缺失for did in dm_ids - mysql_ids:diff_records.append({'id': did, 'type': 'missing_in_mysql'})# 2. 比较共有记录common_ids = mysql_ids & dm_idsfor cid in common_ids:mysql_row = mysql_data[cid]dm_row = dm_data[cid]for col in metadata: # 遍历所有字段if col == 'id': continue# 特殊处理浮点数if isinstance(mysql_row[col], float) and isinstance(dm_row[col], float):if abs(mysql_row[col] - dm_row[col]) > 1e-9:diff_records.append({'id': cid,'field': col,'mysql_val': mysql_row[col],'dm_val': dm_row[col],'type': 'value_mismatch'})# 普通类型比较elif mysql_row[col] != dm_row[col]:diff_records.append({'id': cid,'field': col,'mysql_val': mysql_row[col],'dm_val': dm_row[col],'type': 'value_mismatch'})
6. 主循环流程
# 初始化第一批
current_batch = {'mysql': fetch_batch(mysql_conn, last_id, BATCH_SIZE),'dm': fetch_batch(dm_conn, last_id, BATCH_SIZE)
}while current_batch['mysql'] or current_batch['dm']:# 启动异步线程预取下一批fetch_thread = threading.Thread(target=async_fetch_next)fetch_thread.start()# 比较当前批次compare_batches(current_batch['mysql'], current_batch['dm'])# 等待异步完成fetch_thread.join()# 切换到下一批current_batch = next_batchnext_batch = {'mysql': None, 'dm': None} # 重置# 输出结果
print(f"发现 {len(diff_records)} 处差异")
for diff in diff_records:if diff['type'] == 'missing_in_dm':print(f"ID {diff['id']} 在达梦中缺失")elif diff['type'] == 'missing_in_mysql':print(f"ID {diff['id']} 在MySQL中缺失")else:print(f"ID {diff['id']} 字段 {diff['field']} 不匹配: "f"MySQL={diff['mysql_val']}, DM={diff['dm_val']}")
关键设计要点
-
分批处理机制:
- 使用
last_id
作为游标,按主键顺序获取 - 每批处理2000条,避免内存溢出
- 使用
-
异步预取优化:
- 当前批次比较时,后台线程获取下一批
- 利用数据库查询的I/O等待时间
-
差异检测全面性:
-
特殊类型处理:
- 浮点数采用精度阈值比较(1e-9)
- 空值处理:
None
vsNULL
- 日期类型统一转换为ISO格式字符串比较
-
达梦分页语法:
- 使用ROWNUM三层嵌套实现分页
- 兼容达梦V7/V8版本语法
-
断点续传支持:
- 记录最后处理的ID
- 异常中断后可从中断点继续
异常处理考虑
-
连接故障:
try:fetch_batch() except DatabaseError as e:reconnect_and_retry()
-
数据不一致处理:
- 记录不一致的字段路径
- 支持生成数据修复脚本
-
内存控制:
- 每批处理完立即释放引用
- 差异结果定期写入文件
此设计通过异步预取实现比较与数据加载的并行操作,充分利用I/O等待时间,在保证内存安全的前提下显著提升比较效率。实际部署时可添加进度监控、结果持久化和邮件报警等增强功能。