一、线程事件对象(threading.Event
)
threading.Event
用于实现线程间的通信,可让一个线程通知其他线程终止任务,核心是通过 “事件触发” 机制协调线程行为。
核心方法:
- 创建事件对象:
event = threading.Event()
- 触发事件:
event.set()
(将事件状态设为 “已触发”) - 判断事件状态:
event.is_set()
(返回True
表示事件已触发)
案例:动物赛跑(乌龟 vs 兔子)
通过 Event
实现 “一方到达终点则所有线程停止” 的逻辑:
import threading
from threading import Thread, Event, current_thread
from abc import ABC, abstractmethod
import timeclass Animal(Thread, ABC):max_distance = 2000 # 比赛总距离def __init__(self, speed, name, time1=0, state=False, stop_event: Event = None):super().__init__(name=name)self.speed = speed # 速度self.time1 = time1 # 已跑时间self.state = state # 是否到达终点self.stop_event = stop_event or Event() # 共享的停止事件@abstractmethoddef distance(self):# 抽象方法:计算当前跑过的距离passdef run(self):# 循环条件:未收到停止信号while not self.stop_event.is_set():self.time1 += 0.1 # 每次循环增加0.1秒length = round(self.distance()) # 计算当前距离print(f"{self.name}在{self.time1:.1f}秒跑了{length}mm")# 检查是否到达终点if length >= self.max_distance:self.state = Trueprint(f"\n{self.name}到达终点!")self.stop_event.set() # 通知其他线程停止breaktime.sleep(0.1) # 模拟时间流逝class Turtle(Animal):def distance(self):# 乌龟匀速前进return self.time1 * self.speedclass Rabbit(Animal):def distance(self):# 兔子逻辑:前5秒快跑,休息180秒,之后继续跑if self.time1 < 5:return self.time1 * self.speed # 前5秒正常跑elif self.time1 < 5 + 180:return 5 * self.speed # 5-185秒休息(保持5秒时的距离)else:rest_end_time = 5 + 180 # 休息结束时间extra_time = self.time1 - rest_end_time # 休息后额外跑的时间return 5 * self.speed + extra_time * self.speed # 总距离if __name__ == '__main__':race_stop_event = Event() # 共享的停止事件rabbit = Rabbit(speed=90, name="兔子", stop_event=race_stop_event)turtle = Turtle(speed=10, name="乌龟", stop_event=race_stop_event)print("比赛开始!")rabbit.start()turtle.start()rabbit.join()turtle.join()# 判断结果if rabbit.state:print("兔子赢了!")elif turtle.state:print("乌龟赢了!")
二、线程安全与队列(queue.Queue
)
多线程共享数据时,列表等结构线程不安全(可能引发并发问题),而 queue.Queue
是线程安全的,常用于多线程通信,遵循 FIFO(先进先出)原则。
常见队列类型:
Queue
:FIFO 队列PriorityQueue
:基于优先级的队列LifoQueue
:LIFO(先进后出)队列
队列核心方法:
方法 | 说明 |
---|---|
empty() | 判断队列是否为空 |
full() | 判断队列是否已满(仅对有长度限制的队列有效) |
qsize() | 获取队列中数据个数 |
put(item, timeout) | 存入数据,队列满时阻塞,可设置超时时间 |
put_nowait(item) | 存入数据,队列满时不阻塞(直接报错) |
get(timeout) | 获取数据,队列空时阻塞,可设置超时时间 |
get_nowait() | 获取数据,队列空时不阻塞(直接报错) |
三、条件对象(threading.Condition
)
Condition
用于复杂的线程间同步,通过 “等待 - 唤醒” 机制协调线程行为,自带锁(默认 RLock
),需在锁块中使用 wait()
和 notify()
。
核心方法:
wait(timeout=None)
:阻塞当前线程,直到超时或被其他线程唤醒notify(n=1)
:唤醒 1 个等待的线程(默认)notify_all()
:唤醒所有等待的线程
案例:生产者 - 消费者(摘苹果与吃苹果)
生产者摘苹果存入队列,队列满时通知消费者;消费者吃苹果,队列空时通知生产者:
from threading import Thread, Condition, current_thread
from queue import Queue
import timeclass Apple:__number_code__ = 0 # 苹果编号计数器def __init__(self):self.__class__.__number_code__ += 1self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple : {self.number}>"def product_apple(queue: Queue, condition: Condition):while True:with condition: # 锁块:确保操作原子性while queue.full():condition.wait() # 队列满时等待apple = Apple()print(f"{current_thread().name}正在摘取{apple}")queue.put(apple)time.sleep(0.2) # 模拟摘苹果耗时if queue.full():condition.notify_all() # 队列满时唤醒消费者def eat_apple(queue: Queue, condition: Condition):while True:with condition:while queue.empty():condition.wait() # 队列空时等待apple = queue.get()print(f"{current_thread().name}正在吃{apple}~~")time.sleep(0.5) # 模拟吃苹果耗时if queue.empty():condition.notify_all() # 队列空时唤醒生产者if __name__ == '__main__':apple_queue = Queue(maxsize=50) # 队列最大容量50condition = Condition()# 启动生产者和消费者productor = Thread(target=product_apple, args=(apple_queue, condition), name='生产者01')productor.start()for x in range(2):consumer = Thread(target=eat_apple, args=(apple_queue, condition), name=f'消费者{x}')consumer.start()
四、线程状态
线程生命周期包含以下状态及转换:
- 新建状态:创建线程对象后,调用
start()
之前 - 就绪状态:调用
start()
后,等待 CPU 调度 - 运行状态:获取 CPU 时间片,执行
run()
方法 - 阻塞状态:因
sleep()
、join()
、input()
等操作暂停,阻塞结束后回到就绪状态 - 死亡状态:
run()
执行完毕或异常终止
五、线程池(concurrent.futures.ThreadPoolExecutor
)
线程池可复用线程,减少线程创建 / 销毁的开销,适用于多任务场景。
核心用法:
- 创建线程池:
executor = ThreadPoolExecutor(max_workers)
(max_workers
为线程数) - 提交任务:
executor.submit(func, *args)
(返回Future
对象,用于获取结果) - 获取结果:
future.result()
(阻塞等待任务完成并返回结果)
案例 1:线程池售票
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import timedef execute_task(queue: Queue):while True:time.sleep(0.5)try:ticket = queue.get(timeout=1) # 超时1秒未获取到票则退出print(f"{current_thread().name}正在售票、票号是{ticket}、剩余{queue.qsize()}")except:print(f"{current_thread().name}:票已售罄")breakif __name__ == '__main__':executor = ThreadPoolExecutor(5) # 5个线程的线程池queue = Queue(maxsize=100)for x in range(1, 101):queue.put(f"NO.{x:>08}") # 初始化100张票for _ in range(5):executor.submit(execute_task, queue)
案例 2:多线程爬虫(抓取小说章节)
主线程获取小说章节列表,线程池并发抓取章节内容,最终汇总保存:
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue
import requests
import re
import time
from traceback import print_excdef parse_url(url, count=0):"""解析网址,最多重试3次"""headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}try:if count < 3:response = requests.get(url, headers=headers)assert response.status_code == 200, f'网页抓取失败,状态码{response.status_code}'return response.textreturn Noneexcept:print_exc()return parse_url(url, count + 1) # 重试def parse_caption(queue: Queue):"""抓取单章节内容"""try:time.sleep(0.5) # 降低频率,避免被封index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")text = parse_url(f"https://www.c3719.lol{href}")print(f"{current_thread().name}抓取{title}成功!!!")if text is None:return index, title, ""# 提取章节正文regex = r'<div\s+id="chaptercontent".*?>(.*?)请收藏本站'match = re.search(regex, text)if not match:return index, title, ""caption = match.group(1)# 清洗内容(去除标签和多余空格)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章节抓取完成")if __name__ == '__main__':# 主线程获取章节列表url = "https://www.c3719.lol/book/61808/"noval_name = '斗罗大陆V重生唐三'text = parse_url(url)# 提取所有章节链接和标题regex = r'<dd><a\s+href\s*="(.*?)">(.*?)</a></dd>'all_captions = re.findall(regex, text)# 章节信息存入队列queue = Queue()for index, caption in enumerate(all_captions):queue.put((index, *caption))# 线程池并发抓取executor = ThreadPoolExecutor(50)futures = [executor.submit(parse_caption, queue) for _ in all_captions]# 获取结果并排序result = [f.result() for f in futures]result.sort(key=lambda x: x[0]) # 按章节顺序排序# 保存到文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n")f.write(text + "\n")f.flush()print(f"{noval_name}抓取完成")
作业一:线程通信(生产者 - 消费者模型)
需求:
a) 定义 1 个生产者线程,负责摘苹果并存储到队列中;当队列已满时,通知消费者吃苹果并停止摘苹果。
b) 定义多个消费者线程,负责吃苹果;当队列为空时,通知生产者继续摘苹果。
实现代码
from queue import Queue
from threading import Thread, Condition, current_thread
import time# 苹果类:记录苹果编号
class Apple:__number_code__ = 0 # 类变量,用于计数苹果编号def __init__(self):self.__class__.__number_code__ += 1 # 每次创建苹果时编号自增self.number = self.__class__.__number_code__def __repr__(self):return f"<Apple> : {self.number}" # 自定义苹果的打印格式# 生产者函数:摘苹果并放入队列
def produce_apple(queue: Queue, condition: Condition):while True:with condition: # 借助Condition的锁确保操作原子性# 队列满时等待(释放锁,等待消费者唤醒)while queue.full():condition.wait()# 摘苹果并放入队列apple = Apple()queue.put(apple)print(f"{current_thread().name}摘下: <{apple}>")time.sleep(0.2) # 模拟摘苹果耗时# 队列满时通知所有消费者if queue.full():condition.notify_all()# 消费者函数:从队列取苹果并"吃"
def consume_apple(queue: Queue, condition: Condition):while True:with condition: # 借助Condition的锁确保操作原子性# 队列空时等待(释放锁,等待生产者唤醒)while queue.empty():condition.wait()# 从队列取苹果并"吃"apple = queue.get()print(f"{current_thread().name}正在吃: <{apple}>")time.sleep(0.2) # 模拟吃苹果耗时# 队列空时通知生产者if queue.empty():condition.notify() # 唤醒1个生产者(此处只有1个生产者,notify_all()也可)# 主程序:启动生产者和消费者
if __name__ == '__main__':# 创建队列(最大容量20)和条件对象apple_queue = Queue(maxsize=20)condition = Condition()# 启动1个生产者线程producer = Thread(target=produce_apple, args=(apple_queue, condition), name="生产者")producer.start()# 启动3个消费者线程for i in range(3):consumer = Thread(target=consume_apple, args=(apple_queue, condition), name=f"消费者<{i}>")consumer.start()
核心逻辑解析
- Apple 类:通过类变量
__number_code__
记录苹果编号,每次创建苹果时自动生成唯一编号,便于跟踪。 - Condition 协调:
- 生产者通过
condition.wait()
在队列满时阻塞,等待消费者唤醒;队列满时通过notify_all()
通知所有消费者。 - 消费者通过
condition.wait()
在队列空时阻塞,等待生产者唤醒;队列空时通过notify()
通知生产者。
- 生产者通过
- 线程安全:
with condition
确保生产 / 消费操作在锁保护下执行,避免并发冲突。
作业二:多线程小说抓取与下载
需求:使用多线程(线程池)抓取笔趣阁类网站(如https://www.biqu03.cc/
)某小说的所有章节内容,并保存为本地文件。
实现代码
import re
import requests
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
from queue import Queue# 解析小说章节列表(获取所有章节的链接和标题)
def parse_url(url):headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 发送请求获取小说目录页内容response = requests.get(url, headers=headers)assert response.status_code == 200, "目录页请求失败" # 确保请求成功text = response.text# 正则提取章节链接和标题(<dd><a href="链接">标题</a></dd>)regex = r'<dd><a\s+href\s+="(.*)">(.*)</a></dd>'all_title = re.findall(regex, text) # 返回列表:[(链接1, 标题1), (链接2, 标题2), ...]return all_title# 解析单章节内容(从队列获取章节信息,抓取正文并返回)
def parse_caption(queue: Queue):try:time.sleep(0.5) # 降低请求频率,避免被网站反爬headers = {"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0","cookie": "ckAC = 1;Hm_lvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;Hm_lpvt_15cfcc3e15bd556d32e5aedcadd5a38b = 1754045545;HMACCOUNT = FDAFB0724C56B8F8","referer": "https://www.bqgui.cc/"}# 从队列获取章节信息(索引、链接、标题)index, href, title = queue.get(timeout=1)print(f"{current_thread().name}正在抓取{title}~~~")# 构建章节详情页URL并请求new_url = f"https://www.c3719.lol{href}" # 注意替换为实际网站域名response = requests.get(new_url, headers=headers)assert response.status_code == 200, f"{title}详情页请求失败"text = response.textprint(f"{current_thread().name}抓取{title}成功!!!")# 正则提取章节正文(匹配id为chaptercontent的div内容)regex = r'<div\s+id="chaptercontent".*?>(.*?)请收藏本站'match = re.search(regex, text, re.DOTALL) # re.DOTALL让.匹配换行符if not match:return index, title, "" # 无正文时返回空# 清洗正文(去除<br>标签和多余空格)caption = match.group(1)caption = re.sub(r"<br\s*/?>|\s+", "", caption)return index, title, captionexcept:print(f"{current_thread().name}所有章节抓取完成")# 主程序:协调抓取流程并保存结果
if __name__ == '__main__':# 小说目录页URL(替换为目标小说目录页)url = "https://www.c3719.lol/book/61808/"all_tit = parse_url(url) # 获取所有章节信息# 章节信息存入队列(用于线程池消费)queue = Queue()for index, x in enumerate(all_tit):queue.put((index, *x))# 小说名称(用于保存文件)noval_name = '斗罗大陆V重生唐三'# 线程池并发抓取所有章节executor = ThreadPoolExecutor(50) # 50个线程并发futures = [executor.submit(parse_caption, queue) for _ in all_tit]# 收集并排序结果(按章节索引排序)result = [f.result() for f in futures if f.result() is not None]result.sort(key=lambda d: d[0]) # 按索引排序,确保章节顺序正确# 保存到本地文件with open(f'{noval_name}.txt', 'w', encoding='utf-8') as f:for index, title, text in result:f.write(title + "\n") # 写入标题f.write(text + "\n") # 写入正文f.flush() # 实时刷新缓冲区print(f"{noval_name}抓取完成,已保存为本地文件!")
核心逻辑解析
- 章节列表解析:通过
parse_url
函数请求小说目录页,用正则提取所有章节的链接和标题,为后续抓取做准备。 - 多线程并发抓取:使用
ThreadPoolExecutor
创建线程池,多个线程从队列获取章节信息,并发请求详情页,提高效率。 - 正文提取与清洗:通过正则匹配章节正文所在的 HTML 标签,去除冗余标签(如
<br>
)和空格,得到干净的文本。 - 结果排序与保存:通过章节索引排序结果,确保章节顺序正确,最终写入本地 TXT 文件。