爬虫03 - 爬虫的数据存储

文章目录

  • 爬虫03 - 爬虫的数据存储
    • 一:CSV数据存储
      • 1:基本介绍
      • 2:基本使用
      • 3:高级使用
      • 4:使用示例
    • 二:JSON数据存储
      • 1:基础json读写
      • 2:字符串和对象的转换
      • 3:日期和时间的特殊处理
      • 4:自定义序列化
      • 5:高级使用
    • 三:数据库数据存储
      • 1:基础部分学习
      • 2:补充说明

一:CSV数据存储

1:基本介绍

csv是一种轻量级、跨平台的文件格式,广泛用于数据交换、日志记录及中小规模数据存储

  • 无需依赖‌:直接通过Python标准库csv模块操作。
  • ‌人类可读‌:文本格式可直接用Excel或文本编辑器查看。
  • ‌高效灵活‌:适合快速导出、导入表格型数据。

csv格式如下:

  • 每行表示一条记录,字段以特定分隔符(默认为逗号)分隔。
  • 支持文本引用(如双引号)包裹含特殊字符的字段。
  • 首行可定义列名(表头)。
适用场景说明
数据导出、备份从数据库或API批量导出结构化数据
数据分析预处理配合Pandas进行统计与可视化
跨系统数据交换兼容Excel/R/MATLAB等工具

2:基本使用

python内置CSV模块,无需额外安装

基本读写csv实例

import csvheaders = ["id", "name", "email"]
data = [[1, "张三", "zhangsan@example.com"],[2, "李四", "lisi@test.org"],[3, "王五", "wangwu@demo.net"]
]# 用open函数,第一个参数是文件名,第二个参数是模式(r -> read, w -> wirte),第三个参数是编码方式encoding -> utf-8
with open("output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f)writer.writerow(headers)  # 写入表头writer.writerows(data)    # 批量写入数据
import csvwith open ("output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.reader(f)for row in reader:print(row)

字典方式读写实例

# 写入字典数据
with open("dict_output.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "name", "email"])writer.writeheader()writer.writerow({"id": 1, "name": "张三", "email": "zhangsan@example.com"})# 读取为字典数据
with open("dict_output.csv", "r", newline="", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:print(row)print(row["id"], row["name"], row["email"])

自定义分割符和引号规则

# 使用分号分隔,双引号包裹所有字段
with open("custom.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, delimiter=";", quoting=csv.QUOTE_ALL)writer.writerow(["id", "name"])writer.writerow([1, "张三"])

3:高级使用

含特殊字符的字段‌

字段包含逗号、换行符等破坏CSV结构的字符 -> 使用引号包裹字段,并配置csv.QUOTE_MINIMALcsv.QUOTE_NONNUMERIC

data = [[4, "Alice, Smith", "alice@example.com"],[5, "Bob\nJohnson", "bob@test.org"]
]with open("special_chars.csv", "w", newline="", encoding="utf-8") as f:writer = csv.writer(f, quoting=csv.QUOTE_NONNUMERIC)writer.writerows(data)

嵌套数据

import jsondata = [{"id": 1, "info": '{"age": 30, "city": "北京"}'},{"id": 2, "info": '{"age": 25, "city": "上海"}'}
]# 写入嵌套JSON
with open("nested_data.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["id", "info"])writer.writeheader()writer.writerows(data)# 读取并解析JSON
with open("nested_data.csv", "r", encoding="utf-8") as f:reader = csv.DictReader(f)for row in reader:info = json.loads(row["info"])print(f"ID: {row['id']}, 城市: {info['city']}")

复杂情况的处理

大文件 -> 逐行读取 & 分批读取

复杂数据处理 -> pandas

4:使用示例

# 读取百度图书,并将结果放入到csv文件中,方便后面使用pandas进行数据分析
import csv
import requests
from bs4 import BeautifulSoupurl = "https://book.douban.com/top250"
headers = {"User-Agent": "Mozilla/5.0"}response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "html.parser")books = []
for item in soup.select("tr.item"):title = item.select_one(".pl2 a")["title"]score = item.select_one(".rating_nums").textbooks.append({"title": title, "score": score})# 写入CSV
with open("douban_books.csv", "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=["title", "score"])writer.writeheader()writer.writerows(books)print("数据已存储至 douban_books.csv")

二:JSON数据存储

使用场景说明
配置文件存储程序参数、路径配置等(如config.json)
API数据交互前后端通过JSON格式传递请求与响应
结构化日志记录记录带元数据的操作日志,便于后续分析

1:基础json读写

dump -> 写入到json, load -> 从json读取

import jsondata = {"project": "数据分析平台","version": 2.1,"authors": ["张三", "李四"],"tags": {"python": 5, "database": 3}
}with open("data.json", "w", encoding="utf-8") as f:json.dump(data, f, ensure_ascii=False, indent=2)  # 禁用ASCII转义,缩进2空格with open("data.json", "r", encoding="utf-8") as f:loaded_data = json.load(f)
print(loaded_data["tags"]["python"])  # 输出:5

2:字符串和对象的转换

dumps -> json转成字符串,loads -> 字符串转成json

data_str = json.dumps(data, ensure_ascii=False)
print(type(data_str))  # <class 'str'>json_str = '{"name": "王五", "age": 30}'
obj = json.loads(json_str)
print(obj["age"])  # 输出:30

3:日期和时间的特殊处理

JSON默认不支持Python的datetime对象,需自定义转换逻辑

from datetime import datetimedef datetime_encoder(obj):if isinstance(obj, datetime):return obj.isoformat()  # 转为ISO格式字符串raise TypeError("类型无法序列化")data = {"event": "发布会", "time": datetime.now()}# 序列化时指定自定义编码函数
json_str = json.dumps(data, default=datetime_encoder, ensure_ascii=False)
print(json_str)  # {"event": "发布会", "time": "2024-07-20T15:30:45.123456"}

4:自定义序列化

class User:def __init__(self, name, level):self.name = nameself.level = leveluser = User("赵六", 3)# 方法1:手动转换字典
user_dict = {"name": user.name, "level": user.level}
json.dumps(user_dict)# 方法2:使用__dict__(需类属性均为可序列化类型)
json.dumps(user.__dict__)

5:高级使用

跳过特定的字段

def filter_encoder(obj):if "password" in obj:del obj["password"]return objuser_data = {"name": "张三", "password": "123456"}
json.dumps(user_data, default=filter_encoder)  # {"name": "张三"}

取消缩进和空格(紧凑输出)

json.dumps(data, separators=(",", ":"))  # 输出最简格式

ujson代替(C实现的JSON超高速库,API完全兼容)

import ujson as json  # 替换标准库
json.dumps(data)      # 速度提升3-10倍

大文件的读取

# 逐行读取JSON数组文件(每行为独立JSON对象)
with open("large_data.json", "r", encoding="utf-8") as f:for line in f:item = json.loads(line)process(item)

三:数据库数据存储

1:基础部分学习

各种数据库的数据存储请看这里

2:补充说明

可以使用dbutils进行mysql的连接池管理

import pymysql
from dbutils.pooled_db import PooledDBpool = PooledDB(creator=pymysql,  # 使用链接数据库的模块maxconnections=5,  # 连接池允许的最大连接数,0和None表示不限制连接数mincached=1, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建maxcached=2, # 链接池中最多闲置的链接,0和None不限制maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待ping=0, # ping MySQL服务端,检查是否服务可用host='127.0.0.1', # 数据库服务器的IP地址port=3306, # 数据库服务端口user='root', # 数据库用户名password='314159', # 数据库密码database='test', # 数据库名称charset='utf8' # 数据库编码
)# 创建游标对象
conn = pool.connection()
cursor = conn.cursor()# 有了游标对象就能操作了,方式都是 -> cursor.???(sql, args)
sql = "select * from user where id = %s"
cursor.execute(sql, [1]) # 执行SQL语句 -> select * from user where id = 1
# 获取结果
print(cursor.fetchone())

mongodb的管道操作

from datetime import datetimefrom pymongo import MongoClient
from pymongo.errors import ConnectionFailure# 建立连接(默认连接池大小100)
client = MongoClient(host="localhost",port=27017,
)try:# 心跳检测client.admin.command('ping')print("Successfully connected to MongoDB!")
except ConnectionFailure:print("Server not available")# 选择数据库与集合(自动懒创建)
db = client["ecommerce"]
products_col = db["products"]# 插入文档(自动生成_id)
product_data = {"name": "Wireless Mouse","price": 49.99,"tags": ["electronics", "computer"],"stock": {"warehouse_A": 100, "warehouse_B": 50},"last_modified": datetime.now()
}
insert_result = products_col.insert_one(product_data)
print(f"Inserted ID: {insert_result.inserted_id}")# 查询文档(支持嵌套查询)
query = {"price": {"$lt": 60}, "tags": "electronics"} # 查询价格小于60 并且 tags是electronics 的文档
projection = {"name": 1, "price": 1}  # 类似SQL SELECT, 只返回name和price字段
cursor = products_col.find(query, projection).limit(5)
for doc in cursor:print(doc)# 更新文档(原子操作)
update_filter = {"name": "Wireless Mouse"}
update_data = {"$inc": {"stock.warehouse_A": -10}, "$set": {"last_modified": datetime.now()}}
update_result = products_col.update_one(update_filter, update_data)
print(f"Modified count: {update_result.modified_count}")# 删除文档
delete_result = products_col.delete_many({"price": {"$gt": 200}})
print(f"Deleted count: {delete_result.deleted_count}")# 管道操作
# 统计各仓库库存总量
pipeline = [{"$unwind": "$stock"},  # 阶段一:展开嵌套文档{"$group": { # 阶段二:分组聚合# 分组字段(分段的字段是stock.warehouse)记为_id# 聚合字段为 sum(对每一个分组的stock.quantity进行sum)"_id": "$stock.warehouse","total_stock": {"$sum": "$stock.quantity"}}},# 阶段三:排序, 根据分组条件降序排序{"$sort": {"total_stock": -1}}
]
results = products_col.aggregate(pipeline)
for res in results:print(f"Warehouse {res['_id']}: {res['total_stock']} units")

mongodb大数据的处理

from pymongo import MongoClient
from faker import Faker
import timeclient = MongoClient('mongodb://localhost:27017/')
db = client['bigdata']
collection = db['user_profiles']fake = Faker()
batch_size = 5000  # 分批次插入减少内存压力def generate_batch(batch_size):return [{"name": fake.name(),"email": fake.email(),"last_login": fake.date_time_this_year()} for _ in range(batch_size)]start_time = time.time()
for _ in range(200):  # 总数据量100万batch_data = generate_batch(batch_size)collection.insert_many(batch_data, ordered=False)  # 无序插入提升速度print(f"已插入 {(i+1)*batch_size} 条数据")print(f"总耗时: {time.time()-start_time:.2f}秒") # 分析电商订单数据(含嵌套结构)
pipeline = [{"$unwind": "$items"},  # 展开订单中的商品数组{"$match": {"status": "completed"}},  # 筛选已完成订单{"$group": {"_id": "$items.category","total_sales": {"$sum": "$items.price"},"avg_quantity": {"$avg": "$items.quantity"},"top_product": {"$max": "$items.name"}}},{"$sort": {"total_sales": -1}},{"$limit": 10}
]orders_col = db["orders"]
results = orders_col.aggregate(pipeline)for res in results:print(f"品类 {res['_id']}: 销售额{res['total_sales']}元")

性能优化的关键措施 -> 添加索引 & bluk操作

# 创建索引(提升查询速度)
products_col.create_index([("name", pymongo.ASCENDING)], unique=True)
products_col.create_index([("price", pymongo.ASCENDING), ("tags", pymongo.ASCENDING)])# 批量写入提升吞吐量
bulk_ops = [pymongo.InsertOne({"name": "Keyboard", "price": 89.99}),pymongo.UpdateOne({"name": "Mouse"}, {"$set": {"price": 59.99}}),pymongo.DeleteOne({"name": "Earphones"})
]
results = products_col.bulk_write(bulk_ops)

高可用架构配置

from pymongo import MongoClient
from pymongo.errors import ConnectionFailure# MongoDB 副本集连接字符串
# replicaSet=rs0 是副本集的名称
uri = "mongodb://192.127.1.1:27017,192.127.1.2:27017,192.127.1.3:27017/mydb?replicaSet=rs0"# 创建 MongoClient 实例
client = MongoClient(uri)# 测试连接
try:# 通过执行一个简单的命令来验证连接是否成功client.admin.command('ping')print("成功连接到 MongoDB 副本集!")
except ConnectionFailure as e:print(f"无法连接到 MongoDB 副本集: {e}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/89959.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/89959.shtml
英文地址,请注明出处:http://en.pswp.cn/pingmian/89959.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入分析计算机网络数据链路层和网络层面试题

计算机网络体系结构1. 请简述 OSI 七层模型和 TCP/IP 四层模型&#xff0c;并比较它们的异同。OSI 七层模型&#xff1a;应用层&#xff1a;直接为用户的应用进程提供服务&#xff0c;如 HTTP&#xff08;超文本传输协议&#xff0c;用于 Web 浏览器与服务器通信&#xff09;、…

云服务器新装的mysql8,无法通过远程连接,然后本地pymysql也连不上

阿里云服务器&#xff0c;用apt-get新装的mysql-server&#xff0c;竟然无法通过远程连接到&#xff0c;竟然是这个原因。不是防火墙&#xff0c;iptables早就关了。也不是安全组&#xff0c;不是人为限制访问的话&#xff0c;根本没必要弄安全组 排查过程 netstat -antop|grep…

质量即服务:从测试策略到平台运营的全链路作战手册

&#xff08;零&#xff09;为什么需要“质量即服务” 当业务方说“今晚一定要上线”&#xff0c; 当开发说“我只改了两行代码”&#xff0c; 当运维说“回滚窗口只有 5 分钟”&#xff0c; 质量必须像水电一样随取随用&#xff0c;而不是上线前的大坝泄洪。 这篇手册提供一张…

Java -- 自定义异常--Wrapper类--String类

自定义异常&#xff1a;概念&#xff1a;当程序中出现了某些错误&#xff0c;但该错误信息并没有在Throwable子类中描述处理&#xff0c;这个时候可以自己设计异常&#xff0c;用于描述该错误信息。步骤&#xff1a;1. 定义类&#xff1a;自定义异常类名&#xff08;程序员自己…

一文速通《线性方程组》

目录 一、解题必记知识点 二、解题必备技巧 三、非齐次线性方程组求解 四、齐次线性方程组求解 ★五、解析题目信息&#xff0c;获取暗含条件 一、解题必记知识点 (1) (2)基础解系线性无关&#xff0c;基础解系 解空间的一个基&#xff0c;基 一组线性无关的、能够生…

【Django】DRF API版本和解析器

讲解 Python3 下 Django REST Framework (DRF) API 版本控制解析器&#xff08;Parser&#xff09;一、DRF API 版本控制详解 API 版本控制是构建健壮、可维护的 RESTful API 的关键&#xff0c;尤其在项目演进中需要兼容不同版本的客户端请求。 1.1 API 版本控制的核心原理 AP…

Windows系统暂停更新工具

功能说明 暂停更新至2999年恢复系统更新彻底禁用更新&#xff08;不可逆&#xff09; 使用方法 下载解压后双击运行 .bat 文件 输入数字选择功能&#xff1a; 输入 1&#xff1a;暂停更新至2999年&#xff08;推荐&#xff09;输入 2&#xff1a;恢复系统更新输入 3&#xf…

git push新版问题解决

git 好像不能通过username:password的方式来git push了。但我的电脑依然弹出username和password的弹窗。转战ssh来git push。由于之前是用git clone克隆的&#xff0c;需要再转换成ssh的url来git push。

PyCharm + AI 辅助编程

PyCharm AI&#xff1a;初学者友好的 2 个实用场景&#xff08;附操作步骤&#xff09; PyCharm 专业版&#xff08;或通过插件集成&#xff09;支持 AI 辅助编程&#xff08;如 JetBrains AI 或 GitHub Copilot&#xff09;&#xff0c;能根据代码上下文自动生成代码、解释逻…

疯狂星期四文案网第15天运营日记

网站运营第15天&#xff0c;点击观站&#xff1a; 疯狂星期四 crazy-thursday.com 全网最全的疯狂星期四文案网站 运营报告 昨日访问量 昨天只有20来ip, 太惨了&#xff0c;感觉和最近没有发新段子有关&#xff0c;也没有发新的外链&#xff0c;不知道这周四会怎么样 昨日搜…

如何解决pip安装报错ModuleNotFoundError: No module named ‘Cython’问题

【Python系列Bug修复PyCharm控制台pip install报错】如何解决pip安装报错ModuleNotFoundError: No module named ‘Cython’问题 摘要 在使用 PyCharm 控制台或命令行执行 pip install Cython 时&#xff0c;常会遇到 ModuleNotFoundError: No module named Cython 的报错。本…

freertos任务调度关键函数理解 vTaskSwitchContext

void vTaskSwitchContext(void) {//my_printf( "uxSchedulerSuspended %d\n", uxSchedulerSuspended );/* 调度器处于挂起状态 */if (uxSchedulerSuspended ! (UBaseType_t)pdFALSE) {/*** The scheduler is currently suspended - do not allow a context* switch.…

CPU 密集型 和 I/O 密集型 任务

文章目录**CPU 密集型任务&#xff08;CPU-bound&#xff09;**定义&#xff1a;特点&#xff1a;常见场景&#xff1a;如何优化 CPU 密集型任务&#xff1a;**I/O 密集型任务&#xff08;I/O-bound&#xff09;**定义&#xff1a;特点&#xff1a;常见场景&#xff1a;如何优化…

[2025CVPR-小目标检测方向]基于特征信息驱动位置高斯分布估计微小目标检测模型

核心问题 ​小目标检测性能差&#xff1a;​​ 尽管通用目标检测器&#xff08;如 Faster R-CNN, YOLO, SSD&#xff09;在常规目标上表现出色&#xff0c;但在检测微小目标&#xff08;如 AI-TOD 基准定义的&#xff1a;非常小目标 2-8 像素&#xff0c;小目标 8-16 像素&…

三大工厂设计模式

1.简单工厂模式1.1需求入手从需求进行入手&#xff0c;可以更深入的理解什么是设计模式。有一个制作披萨的需求&#xff1a;需要便于扩展披萨的种类&#xff0c;便于维护。1.披萨的种类有很多&#xff1a;GreekPizz&#xff0c;CheesePizz等2.披萨的制作流程&#xff1a;prepar…

SpringBoot--Mapper XML 和 Mapper 接口在不同包

&#x1f9e9; 背景说明在 Spring Boot 中&#xff0c;MyBatis 默认要求 Mapper 接口和 XML 文件位于相同包路径。 但在实际项目中&#xff0c;为了模块化或结构清晰&#xff0c;常将 XML 放在 resources/mybatis/... 下&#xff0c;这种做法就必须进行额外配置。&#x1f4c1;…

公交车客流人数统计管理解决方案:智能化技术与高效运营实践

1. 引言公交车作为城市公共交通的核心组成部分&#xff0c;其客流数据的精准统计与管理直接影响运营效率、调度优化和乘客体验。传统的人工统计方式效率低、误差大&#xff0c;难以满足现代智慧交通的需求。随着人工智能&#xff08;AI&#xff09;、物联网&#xff08;IoT&…

正则表达式完全指南:从入门到实战

目录 一、什么是正则表达式&#xff1f; 二、基础语法速查表 三、进阶特性 1.分组与捕获 2.非捕获分组 3.前瞻与后顾 4.贪婪与懒惰匹配 四、实战案例 案例1&#xff1a;验证手机号 案例2&#xff1a;提取网页中所有链接 案例3&#xff1a;密码强度验证 一、什么是正…

SmartETL循环流程的设计与应用

1. 引言 **检索增强生成&#xff08;RAG&#xff09;**是指通过检索对大模型生成进行增强的技术&#xff0c;通过充分利用信息检索&#xff08;尤其是语义检索&#xff09;相关技术&#xff0c;实现大模型快速扩展最新知识、有效减少幻觉的能力。主流RAG框架包括问题理解、知识…

uni-app开发小程序,根据图片提取主题色值

需求&#xff0c;在页面根据传入的图片提取图片主色值并用来设置区块背景色<template><view class"icon-container"><view class"sport-icon" :style"{ backgroundColor: mainColor }"><image :src"/static/images/sp…