表数据同步的断点续传

有时候需要将一个表的数据复制到另一个表,循环是常用的方式。当表比较大,执行的时间很长,会有很多因素引起失败。我希望可以比较简单的跑数,所以做一个简单的任务系统。

SQLitre是嵌入式数据库,这样脚本可以不必考虑太多依赖,又可以用到数据库的持久化功能。

1 数据库初始化

给到一个文件路径参数,确定持久化的文件

import sqlite3
import time
import randomdef init_db(DB_FILE = "tasks.db"):"""初始化数据库和表"""conn = sqlite3.connect(DB_FILE)conn.row_factory = sqlite3.Row cur = conn.cursor()cur.execute("""CREATE TABLE IF NOT EXISTS tasks (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT NOT NULL,content TEXT, status INT DEFAULT 0  -- 0=未开始, 1=进行中, 2=完成, 3=失败)""")# 可选:给 status 建索引cur.execute("CREATE INDEX IF NOT EXISTS idx_status ON tasks(status)")cur.execute("CREATE INDEX IF NOT EXISTS idx_name ON tasks(name)")conn.commit()return conn, cur

实际中可以以脚本的file path作为参数(结尾改为.db)

# 建立数据库
conn, cur = init_db(DB_FILE = './the_script.db')

2 任务初始化

同步任务,可以按照id分为若干区间,每个区间任务额可以称为lot。

tuple_list = slice_list_by_batch1(min_id,max_id, 10000)
task_df = pd.DataFrame()
task_df['name'] = ['lot_%s' % i for i in range(len(tuple_list)) ]  
task_df['content'] = tuple_list
task_df['content'] = task_df['content'].apply(lambda x: json.dumps(x))
task_lod = df2lod(task_df)
# 写入任务
cur.executemany("INSERT INTO tasks (name, content) VALUES (?,?)", [(t['name'],t['content']) for t in task_lod])
conn.commit()res = cur.execute('select count(*) from tasks').fetchone()
dict(res)

sqlite也可以提供类似 Row Dict的格式,需要

    conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()

3 处理一次

每次启动时,读出未处理的任务。任务里有tuple参数,根据tuple的起止区间获取数据后进行处理,如果成功就根据任务名将状态更新为2,异常则更新为3。这样每次重复执行这个就可以了,因为数据持久化在文件中,所以即使和服务器断开连接也没关系。

def process_one():new_task_sql  = 'select name, content, status from tasks where status = 0 limit 1'# new_task = cur.execute(new_task_sql).fetchall()conn.row_factory = sqlite3.Row cur = conn.cursor()row = cur.execute(new_task_sql).fetchone()process_code = 1if row:row_dict = dict(row)tem_tuple = json.loads(row_dict['content'])print('>>>> ',row_dict['name'])try:some_tuple = tem_tuple---- DO LOGICcur.execute("UPDATE tasks SET status = 2 WHERE name = ?", (row_dict['name'],))conn.commit()except:cur.execute("UPDATE tasks SET status = 3 WHERE name = ?", (row_dict['name'],))conn.commit()else:process_code = 0return process_code

4 处理直到结束

由于 process_one在没有获取到待处理任务行时会返回0,这个作为结束信号。所以可以给到一个略大的循环次数,当收到结束信号时停止。

for i in range(30000):process_code = process_one()if not process_code:print('无待处理任务')break 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/94522.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/94522.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/94522.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringCloud Alibaba核心知识点

Spring Cloud Alibaba 是阿里巴巴开源的一套微服务解决方案,与 Spring Cloud 生态深度集成。以下是其主要组件及其功能:Nacos服务注册与发现:支持动态服务注册、健康监测及DNS-Based服务发现。配置中心:提供分布式配置管理&#x…

LeetCode 分类刷题:34. 在排序数组中查找元素的第一个和最后一个位置

题目给你一个按照非递减顺序排列的整数数组 nums,和一个目标值 target。请你找出给定目标值在数组中的开始位置和结束位置。如果数组中不存在目标值 target,返回 [-1, -1]。你必须设计并实现时间复杂度为 O(log n) 的算法解决此问题。示例 1:…

自建知识库,向量数据库 (十二)之 文章向量搜索——仙盟创梦IDE

“未来之窗” 文章向量搜索:多领域应用与学习指南 在数字化浪潮中,“未来之窗” 文章向量搜索凭借其独特的技术优势,在酒店、电商、诊疗及知识库等多个领域展现出巨大的应用潜力,为各行业的信息处理与检索带来了全新的视角和高效…

深度剖析:基于反射的.NET二进制序列化器设计与实现

🔍 深度剖析:基于反射的.NET二进制序列化器设计与实现本文将从底层原理到高级优化,全面剖析一个基于反射的.NET二进制序列化器的设计与实现,涵盖类型系统处理、内存布局、递归算法、性能优化等核心主题。1. 设计哲学与架构总览 1.…

如何在 Ubuntu 上安装和配置 Samba ?

Samba 是一个开源程序,用于文件共享和网络打印,使用 SMB 协议。现在基本上用于提供在 Windows 上可访问的 Linux 文件共享系统。 本文介绍如何在 Ubuntu 上安装和配置 Samba 服务器,以便跨文件夹共享网络上不同的计算机。 Update Your Syst…

MATLAB实现CNN-GRU-Attention时序和空间特征结合-融合注意力机制混合神经网络模型的风速预测

该 MATLAB 代码实现了一个基于 CNN-GRU-Attention 时序和空间特征结合-融合注意力机制混合神经网络模型的风速预测。以下是对代码的简要分析:一、主要功能 该代码用于风速时间序列预测,使用历史风速特征数据(18个特征,75天&#x…

【升级版】从零到一训练一个 0.6B 的 MoE 大语言模型

前文:从零到一训练一个 0.6B 的 MoE 大语言模型,本次升级完全重新从零开始重新训练。主要升级如下: 替换预训练数据集,使用序列猴子通用文本数据集进行预训练。使用更先进的训练方法。新增思考模式控制,可通过添加/th…

51单片机-实现定时器模块教程

本章概述思维导图: 51单片机驱动定时器模块 CPU时序简介 CPU时序定义了CPU内部操作的时间节奏,以下从四个时序周期进行逐步解析; 1、振荡周期 振荡周期:CPU内部时钟源产生的最小时间单位,由晶振或内部振荡器决定&am…

7.Kotlin的日期类

以下是 Kotlin 中常用时间类(基于 java.time 包)的核心方法及使用示例,参考数组方法的表格形式,按类分类展示: 一、LocalDate(日期:年/月/日)方法签名返回值说明示例now(): LocalDat…

【Big Data】Hive技术解析:大数据仓库的SQL桥梁

Hive作为Apache顶级项目,是Hadoop生态系统中最具影响力的SQL查询引擎,它解决了大数据处理与传统SQL技能之间的鸿沟。Hive的核心价值在于将类SQL查询语言HiveQL无缝转换为分布式计算框架MapReduce的任务,使数据分析师能够利用熟悉的SQL语法操作…

Ubuntu2204server系统安装postgresql14并配置密码远程连接

前言: 最近因项目需要安装postgresql14,系统是ubuntu2204server系统,安装好后发现无法实现远程连接,解决了之后在此记录一下解决方法。 疑问: 什么情况下需要配置postgresql远程连接? ①如果是postgresql和…

【嵌入式】【搜集】状态机、状态迁移图及状态模式材料

文章目录状态机状态机状态机定义与核心特点状态机总结状态迁移图状态迁移图状态迁移图核心概念与要素状态迁移图常见错误与规避状态迁移图总结状态模式状态模式状态模式核心概念与组成状态模式核心价值与适用场景状态模式优缺点分析进阶优化技巧行为模式总结状态机 状态机 状…

Java学习历程14——制作一款五子棋游戏(4)

上次我们基本实现了五子棋游戏的功能,这次我们进行一些优化和添加一些便于用户使用的功能。新增功能及优化一、复盘功能复盘功能就是指在下完一局棋后,我们可以通过复盘按钮使本局棋的所有棋子重头开始自动下一遍。分析得知,我们首先要保存以…

记录一次el-table+sortablejs的拖拽bug

bug回顾出现bug的情况时 当编辑表格过于紧凑的时候 有些非必要编辑或需要一眼看到的数据 移动到了el-table-column typeexpand时 同事:怎么拖拽功能用不了了 ok开始检查代码 当原来是个简单的编辑表格 不涉及展开和简单拖拽时 不会出现问题 解决了 出现了展开行以后…

利用go sort.Sort()排序自定义切片

1 sort.Sort()简介2 核心功能3 调用前提4 代码示例 1 sort.Sort()简介 Go语言中的sort.Sort函数是标准库提供的通用排序接口 2 核心功能 核心功能支持多种类型进行快速排序 基础类型支持‌:内置Ints、Float64s、Strings等函数直接排序常见切片 自定义排序‌&a…

Elasticsearch脑裂紧急处理与预防

在 Elasticsearch 中出现 网络分区(Network Partition) 或 脑裂(Split-Brain) 导致两个子集群各自选出 Master 的情况,是非常严重的问题。比如这个场景(20个节点分裂成两个10节点的子集群,各自选…

华为网路设备学习-29(BGP协议 四)路由策略-实验

示例 延伸-具体实验1.代码部分:基础配置R1 [Huawei]int GigabitEthernet 0/0/0 [Huawei-GigabitEthernet0/0/0]ip address 10.1.13.1 24[Huawei]int LoopBack 1 [Huawei-LoopBack1]ip address 172.16.1.1 24 [Huawei-LoopBack1]q [Huawei]int LoopBack 2 [Huawei-Lo…

500系列状态码与可能的场景

501 Not Implemented(未实现)HTTP 方法不支持客户端发送了 PUT、DELETE、PATCH 请求但服务器只实现了 GET 和 POST协议功能不支持客户端使用了 HTTP/2 的某些高级特性服务器只支持 HTTP/1.1,无法处理,返回 501API 接口未完成开发中…

大数据、hadoop、爬虫、spark项目开发设计之基于数据挖掘的交通流量分析研究

大数据、hadoop、爬虫、spark项目开发设计之基于数据挖掘的交通流量分析研究

Pytest项目_day20(log日志)

Log日志优点:记录程序运行信息,方便定位问题python日志模块logging,日志等级如下: DEBUGINFO(正常)WARNINGERROR(报错)示例代码如下:import logging import os.path impo…