一、A Bit of Jargon
1、关键术语解析
1.1 并发 (Concurrency)
定义:
并发是指同时处理多个待处理任务的能力,这些任务可以依次或并行(如果可能)进行,最终每个任务都会成功或失败。
理解:
- 单核 CPU: 即使是单核 CPU 也可以实现并发,因为操作系统调度器会交错执行多个任务,给人一种同时进行的错觉。
- 多任务处理: 并发也常被称为多任务处理。
举例:
想象你在厨房里同时做三道菜:炒菜、煮汤和烤面包。你不会同时做所有事情,而是会交替进行,例如先炒一会儿菜,然后去搅拌汤,再去查看面包的状态。这种交替进行的方式就是并发。
1.2 并行 (Parallelism)
定义:
并行是指同时执行多个计算任务的能力。
理解:
- 硬件要求: 并行需要多核 CPU、多个 CPU、GPU 或集群中的多台计算机。
- 与并发的区别: 并发强调的是任务处理的交替进行,而并行则是真正的同时进行。
举例:
回到厨房的例子,如果你有多个炉灶和帮手,你们可以同时进行炒菜、煮汤和烤面包,这就是并行。
1.3 执行单元 (Execution Unit)
定义:
执行单元是执行代码的并发对象,每个执行单元都有独立的执行状态和调用栈。
Python 原生支持的执行单元:
- 进程 (Process): 独立的内存空间,隔离性强,通信成本高。
- 线程 (Thread): 共享内存空间,通信方便,但需要处理数据竞争问题。
- 协程 (Coroutine): 在单个线程内运行,由事件循环调度,资源消耗低,但需要协作式调度。
1.4 进程 (Process)
定义:
进程是正在运行的计算机程序的实例,使用内存和 CPU 时间片。
特点:
- 隔离性: 每个进程拥有独立的内存空间,彼此隔离。
- 通信方式: 进程间通过管道、套接字或内存映射文件进行通信,这些方式只能传输原始字节,因此 Python 对象需要序列化。
- 资源消耗: 进程比线程消耗更多资源。
- 抢占式多任务: 操作系统调度器会定期抢占(暂停)正在运行的进程,让其他进程运行。这意味着一个冻结的进程不会冻结整个系统(理论上)。
举例:
在 Python 中,可以使用 multiprocessing
或 concurrent.futures
库启动额外的 Python 进程。
1.5 线程 (Thread)
定义:
线程是进程内的执行单元。
特点:
- 共享内存: 同一进程内的线程共享相同的内存空间,这使得线程间数据共享变得容易,但也可能导致数据损坏(当多个线程同时更新同一个对象时)。
- 资源消耗: 线程比进程消耗更少的资源。
- 抢占式多任务: 线程也在操作系统调度器的监督下进行抢占式多任务处理。
举例:
在 Python 中,可以使用 threading
或 concurrent.futures
库创建额外的线程。
1.6 协程 (Coroutine)
定义:
协程是一种可以暂停自身执行并在之后恢复的函数。
Python 中的协程:
- 经典协程: 由生成器函数构建。
- 原生协程: 使用
async def
定义。
特点:
- 事件循环: Python 协程通常在单个线程内运行,由事件循环调度。
- 协作式多任务: 每个协程必须显式地使用
yield
或await
关键字让出控制权,以便其他协程可以并发(但不是并行)执行。 - 阻塞问题: 协程中的任何阻塞代码都会阻塞事件循环和所有其他协程,这与进程和线程支持的抢占式多任务形成对比。
- 资源消耗: 每个协程消耗的资源比线程或进程少。
举例:
使用 asyncio
, Curio
或 Trio
等异步编程框架可以提供事件循环和用于非阻塞、基于协程的 I/O 的支持库。
1.7 队列 (Queue)
定义:
队列是一种数据结构,允许我们按 FIFO(先进先出)顺序放入和取出项目。
用途:
队列允许不同的执行单元交换应用程序数据和控制消息,例如错误代码和终止信号。
Python 中的队列实现:
queue
模块: 提供支持线程的队列类。multiprocessing
和asyncio
包: 实现自己的队列类。- 其他类型的队列:
LifoQueue
(后进先出)和PriorityQueue
(优先级队列)。
1.8 锁 (Lock)
定义:
锁是执行单元可以用来同步其操作并避免数据损坏的对象。
工作原理:
当更新共享数据结构时,运行中的代码应持有相关的锁。这向程序的其他部分发出信号,在锁释放之前等待访问相同的数据结构。
最简单类型的锁:
互斥锁(mutex,用于互斥)。
1.9 竞争 (Contention)
定义:
竞争是指对有限资源的争夺。
- 资源竞争: 当多个执行单元尝试访问共享资源(例如锁或存储)时,就会发生资源竞争。
- CPU 竞争: 当计算密集型进程或线程必须等待操作系统调度器分配 CPU 时间片时,就会发生 CPU 竞争。
2、Python 中的并发支持
2.1 进程和线程与 GIL
GIL (Global Interpreter Lock):
- 定义: GIL 是控制对对象引用计数和其他内部解释器状态的锁。
- 作用: 在任何时候只有一个 Python 线程可以持有 GIL,这意味着即使有多个 CPU 核心,也只有一个线程可以执行 Python 代码。
- 影响:
- 多线程性能: GIL 限制了多线程在 CPU 密集型任务上的性能,因为线程之间会竞争 GIL,导致上下文切换开销。
- I/O 密集型任务: 对于 I/O 密集型任务,GIL 的影响较小,因为 I/O 操作会释放 GIL,线程可以在等待 I/O 时让出 CPU。
- CPU 密集型任务: 对于 CPU 密集型任务,顺序的单线程代码通常更简单、更快。
GIL 的释放:
- 内置函数和 C 扩展: 任何在 Python/C API 级别集成的内置函数或 C 扩展都可以在执行耗时任务时释放 GIL。
- 标准库函数: 所有执行系统调用的标准库函数都会释放 GIL,包括所有执行磁盘 I/O、网络 I/O 和
time.sleep()
的函数。 - NumPy/SciPy: NumPy/SciPy 库中的许多 CPU 密集型函数以及
zlib
和bz2
模块中的压缩/解压缩函数也会释放 GIL。 - 非 Python 线程: 在 Python/C API 级别集成的扩展也可以启动其他不受 GIL 影响的非 Python 线程,但这些线程通常不能更改 Python 对象。
GIL 对网络编程的影响:
由于 I/O 操作会释放 GIL,并且网络读写总是意味着高延迟(与内存读写相比),因此 GIL 对 Python 线程的网络编程影响相对较小。因此,David Beazley 说:“Python 线程擅长什么都不做。”
多核 CPU 上的 CPU 密集型任务:
- 解决方案: 要在多核 CPU 上运行 CPU 密集型 Python 代码,必须使用多个 Python 进程。
总结:
- 多线程适用场景: 如果您希望应用程序更好地利用多核机器的计算资源,建议使用
multiprocessing
或concurrent.futures.ProcessPoolExecutor
。 - 多线程适用场景: 如果您希望同时运行多个 I/O 密集型任务,
threading
仍然是一个合适的模型。
2.2 协程与 GIL
协程与 GIL 的关系:
- 默认情况: 默认情况下,协程在同一个 Python 线程中共享事件循环,因此 GIL 不会影响它们。
- 多线程协程: 在异步程序中使用多个线程是可能的,但最佳实践是让一个线程运行事件循环和所有协程,而额外的线程执行特定任务。
3、实际应用中的注意事项
-
选择合适的并发模型:
- CPU 密集型任务: 使用多进程 (
multiprocessing
或ProcessPoolExecutor
)。 - I/O 密集型任务: 使用多线程 (
threading
或ThreadPoolExecutor
) 或协程 (asyncio
)。 - 混合任务: 可以结合使用多进程和多线程或协程。
- CPU 密集型任务: 使用多进程 (
-
避免 GIL 的限制:
- 使用多进程: 对于 CPU 密集型任务,避免使用多线程,转而使用多进程。
- 使用 C 扩展: 利用 C 扩展释放 GIL,可以提高多线程性能。
-
线程安全:
- 锁的使用: 在多线程环境中,更新共享数据时必须使用锁来防止数据竞争。
- 线程安全的数据结构: 使用线程安全的数据结构,例如
queue.Queue
,可以简化并发编程。
-
协程的阻塞问题:
- 避免阻塞代码: 在协程中避免使用阻塞代码,例如
time.sleep()
,可以使用await asyncio.sleep()
代替。 - 异步编程实践: 遵循异步编程的最佳实践,例如使用异步库和异步 I/O 操作。
- 避免阻塞代码: 在协程中避免使用阻塞代码,例如
二、A Concurrent Hello World
1、Spinner with Threads
1. 概述
本节将深入探讨如何使用 Python 的 threading
模块实现一个简单的终端动画 Spinner。该程序在执行耗时操作时,通过在终端显示旋转动画来提示用户程序正在运行,而不是卡死。
2. 主要概念
2.1 线程(Thread)
- 定义: 线程是操作系统能够进行运算调度的最小单位,是进程中的一个执行流。
- 作用: 在本例中,我们使用线程来实现耗时操作与动画的并发执行,从而避免阻塞主线程。
2.2 threading.Event
类
- 定义:
Event
是线程间最简单的信号机制,用于在线程之间传递事件信号。 - 主要方法:
set()
: 将内部标志设置为True
,通知所有等待该事件的线程。wait(timeout=None)
: 阻塞调用线程,直到内部标志被设置为True
或超时(如果指定了timeout
)。
- 工作原理:
- 初始时,内部标志为
False
。 - 当某个线程调用
set()
时,标志变为True
,所有等待该事件的线程将被唤醒。 - 如果调用
wait()
时指定了timeout
,则线程会在超时后继续执行,即使标志仍为False
。
- 初始时,内部标志为
3. 代码详解
3.1 spin
函数
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')
- 功能: 在终端显示旋转动画,直到接收到停止信号。
- 关键点:
itertools.cycle(r'\|/-')
: 创建一个无限循环的迭代器,依次生成字符'\', '|', '/', '-', '\', ...
,实现旋转效果。'\r'
: 回车符,将光标移动到行首,实现覆盖输出,达到动画效果。flush=True
: 强制刷新输出缓冲区,确保动画及时显示。done.wait(0.1)
: 每隔 0.1 秒检查一次停止信号。如果接收到信号,则退出循环。- 清除动画: 通过打印与动画相同长度的空格,再移动光标到行首,实现清除动画的效果。
3.2 slow
函数
def slow() -> int:time.sleep(3)return 42
- 功能: 模拟一个耗时操作(例如网络请求或复杂计算),阻塞调用线程 3 秒后返回结果
42
。 - 关键点:
time.sleep(3)
会阻塞调用线程,但会释放全局解释器锁(GIL),允许其他线程运行。
3.3 supervisor
函数
def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print(f'spinner object: {spinner}')spinner.start()result = slow()done.set()spinner.join()return result
- 功能: 协调主线程和 spinner 线程的运行。
- 关键点:
Event()
: 创建一个事件对象,用于通知 spinner 线程停止。Thread(target=spin, args=('thinking!', done))
: 创建一个新线程,目标函数为spin
,参数为('thinking!', done)
。spinner.start()
: 启动 spinner 线程,开始显示动画。slow()
: 调用耗时操作,阻塞主线程。done.set()
: 设置事件,通知 spinner 线程停止。spinner.join()
: 等待 spinner 线程结束,确保动画被正确清除。
3.4 main
函数
def main() -> None:result = supervisor()print(f'Answer: {result}')
- 功能: 调用
supervisor
函数并输出结果。
4. 运行流程
- 主线程创建
Event
对象done
。 - 主线程创建并启动 spinner 线程,传入消息
'thinking!'
和done
事件。 - 主线程调用
slow()
,阻塞自身 3 秒。 - Spinner 线程每隔 0.1 秒更新一次动画字符,直到
done
事件被设置。 slow()
返回后,主线程设置done
事件,通知 spinner 线程停止。- 主线程等待 spinner 线程结束,然后输出结果
Answer: 42
。
5. 关键点总结
import itertools
import time
from threading import Thread, Eventdef spin(msg: str, done: Event) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')def slow() -> int:time.sleep(3)return 42def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print(f'spinner object: {spinner}')spinner.start()result = slow()done.set()spinner.join()return resultdef main() -> None:result = supervisor()print(f'Answer: {result}')if __name__ == '__main__':main()
- 线程与 GIL:
- Python 的全局解释器锁(GIL)允许同一时间只有一个线程执行 Python 字节码。
time.sleep()
会释放 GIL,因此其他线程可以运行。- 在本例中,
slow()
阻塞主线程,但 spinner 线程可以继续运行,因为time.sleep()
释放了 GIL。
- 线程间通信:
threading.Event
是一种简单的线程间通信机制,用于通知事件的发生。- 在本例中,
done
事件用于通知 spinner 线程停止运行。
- 动画实现:
- 使用
'\r'
回车符和flush=True
实现覆盖输出,达到动画效果。 - 通过循环更新字符,实现旋转效果。
- 使用
2、Spinner with Processes
1. 多进程与多线程概述
在 Python 中,实现并发编程主要有两种方式:
- 多线程(Threading):在同一个进程中创建多个线程,共享内存空间,但受限于全局解释器锁(GIL),无法真正利用多核 CPU。
- 多进程(Multiprocessing):创建多个独立的进程,每个进程拥有独立的内存空间和 GIL,可以真正利用多核 CPU。
multiprocessing
模块 旨在模拟 threading
模块的 API,使得从多线程到多进程的转换更加容易。
2. multiprocessing.Process
类:创建和管理子进程
2.1 基本用法
与 threading.Thread
类似,multiprocessing.Process
用于创建子进程。以下是 spinner_proc.py
的示例代码:
import itertools
import time
from multiprocessing import Process, Event
from multiprocessing import synchronizedef spin(msg: str, done: synchronize.Event) -> None: for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, end='', flush=True)if done.wait(0.1):breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')def slow() -> int:time.sleep(3)return 42def supervisor() -> int:done = Event()spinner = Process(target=spin, args=('thinking!', done)) # 创建子进程print(f'spinner object: {spinner}')spinner.start() # 启动子进程result = slow()done.set() # 通知子进程spinner.join() # 等待子进程结束return resultdef main() -> None:result = supervisor()print(f'Answer: {result}')if __name__ == '__main__':main()
关键点:
- 创建子进程:
Process(target=spin, args=('thinking!', done))
target
参数指定子进程要执行的函数。args
参数是传递给目标函数的参数元组。
- 启动子进程:
spinner.start()
- 启动子进程并执行目标函数。
- 等待子进程结束:
spinner.join()
- 主进程会阻塞,直到子进程结束。
输出示例:
spinner object: <Process name='Process-1' parent=14868 initial>
Process-1
是子进程的名称。14868
是运行spinner_proc.py
的 Python 进程的进程 ID。
2.2 与 threading.Thread
的对比
- API 相似性:两者都提供了
start()
、join()
等方法,API 风格相似,易于从多线程切换到多进程。 - 实现差异:
threading.Event
是一个类,而multiprocessing.Event
是一个函数,返回一个synchronize.Event
实例。- 需要从
multiprocessing
模块导入synchronize
以使用类型提示。
3. 进程间通信:挑战与解决方案
3.1 进程隔离与数据共享
- 隔离性:操作系统将每个进程隔离,进程之间无法直接共享 Python 对象。
- 序列化与反序列化:跨进程边界传递数据时,需要对数据进行序列化和反序列化,这会带来开销。
示例:
在 spinner_proc.py
中,只有 Event
对象的状态在进程间传递。Event
是由 multiprocessing
模块底层 C 代码使用操作系统信号量实现的,因此可以高效地在进程间共享。
3.2 共享内存
从 Python 3.8 开始,multiprocessing.shared_memory
模块提供了共享内存功能,但存在以下限制:
- 不支持用户自定义类的实例。
- 支持的数据类型有限:
- 原始字节(
bytes
) ShareableList
:一种可变序列类型,可以存储固定数量的int
、float
、bool
、None
以及最大 10MB 的str
和bytes
。
- 原始字节(
4. 实际应用中的注意事项
4.1 进程池(Process Pool)
- 概念:预先创建一组进程,重复利用这些进程执行任务,避免频繁创建和销毁进程带来的开销。
- 使用场景:适用于需要大量任务并行执行的场景,如 CPU 密集型任务。
示例:
from multiprocessing import Pooldef square(x):return x * xif __name__ == '__main__':with Pool(4) as pool:results = pool.map(square, range(10))print(results)
输出示例:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
4.2 进程间通信机制
除了共享内存,multiprocessing
还提供了多种进程间通信机制:
- 队列(Queue):线程和进程安全的 FIFO 队列。
- 管道(Pipe):用于在两个进程之间建立双向通信通道。
- 管理器(Manager):提供多种共享数据结构,如列表、字典等。
队列示例:
from multiprocessing import Process, Queuedef worker(q):q.put('Hello from worker')if __name__ == '__main__':q = Queue()p = Process(target=worker, args=(q,))p.start()print(q.get()) # 输出: Hello from workerp.join()
5. 总结
multiprocessing
模块 提供了强大的多进程编程支持,弥补了多线程受限于 GIL 的不足。- API 相似性 使得从多线程切换到多进程变得相对容易,但需要注意进程间通信带来的开销和复杂性。
- 共享内存 提供了高效的进程间数据共享方式,但需要谨慎使用以避免数据竞争和死锁。
- 进程池 是管理大量并行任务的有效工具,可以提高程序性能。
6. 补充示例:使用 multiprocessing
实现生产者-消费者模型
from multiprocessing import Process, Queue
import timedef producer(q, items):for item in items:q.put(item)print(f'Produced: {item}')time.sleep(1)q.put(None) # 发送终止信号def consumer(q):while True:item = q.get() # 这是一个阻塞操作if item is None:breakprint(f'Consumed: {item}')time.sleep(2)if __name__ == '__main__':q = Queue()p = Process(target=consumer, args=(q,))p.start()producer(q, range(5))p.join()
输出示例:
Produced: 0
Consumed: 0
Produced: 1
Produced: 2
Consumed: 1
Produced: 3
Produced: 4
Consumed: 2
Consumed: 3
Consumed: 4
为什么需要 join()
-
等待子进程完成:
- 在主进程中启动子进程后,主进程会继续执行后续代码。如果不使用
join()
,主进程可能会在子进程完成之前就结束,导致子进程成为孤儿进程或僵尸进程。 join()
方法会阻塞主进程,直到子进程p
结束。这样可以确保子进程完成其任务后再继续执行主进程的后续操作。
- 在主进程中启动子进程后,主进程会继续执行后续代码。如果不使用
-
资源管理:
- 如果主进程过早退出,可能导致子进程无法正常结束,进而导致资源泄漏或其他不可预见的问题。
- 使用
join()
可以确保所有子进程在主进程退出前完成其任务,从而正确地释放资源。
-
同步:
- 在多进程编程中,进程之间的同步非常重要。
join()
提供了一种简单的同步机制,确保主进程在子进程完成后再继续执行。 - 在你的代码中,
join()
确保主进程等待消费者子进程完成消费所有队列中的元素,包括接收到终止信号None
后退出循环。
- 在多进程编程中,进程之间的同步非常重要。
q.get()是如何做到阻塞操作的
要深入理解 Queue.get()
的阻塞实现,我们需要考虑一些底层的同步机制。Python 的 multiprocessing.Queue
是基于底层的操作系统原语来实现的,这些原语确保多进程环境中的数据安全和同步。
1. Queue 的基本结构
multiprocessing.Queue
实际上是通过 multiprocessing.Manager
和底层管道(pipe)来实现的。它使用进程间通信(IPC)机制来在进程间传递信息。关键的组件包括:
- 管道(Pipe):用于在进程之间传递字节流。
- 锁(Lock):用来确保同时只有一个进程可以访问或修改共享资源。
- 信号量(Semaphore)或条件变量(Condition Variable):用于管理进程的同步。
2. 阻塞机制的实现
具体到 Queue.get()
的实现,涉及以下几个步骤:
a. 获取锁
每次调用 get()
方法时,首先需要获取一个锁。这是为了确保当一个进程正在操作队列时,其他进程无法进行相同的操作。锁的使用防止了数据竞争和不一致性。
b. 检查队列状态
一旦锁被获取,get()
方法会检查队列中是否有可用的数据:
- 如果队列不为空:直接从队列中获取数据,然后释放锁。进程可以继续执行。
- 如果队列为空:进程将进入等待状态。
c. 进入等待状态
如果队列为空,get()
会让当前进程进入等待状态。这是通过信号量或条件变量实现的:
- 信号量:信号量会被初始化为 0。当队列中加入新数据(
put()
操作)时,信号量被增加,表示有新的数据可用。阻塞的进程会被唤醒。 - 条件变量:条件变量允许进程等待特定的条件成立(如队列中有数据)。当条件满足时(新数据加入队列),条件变量会通知等待的进程继续执行。
d. 被唤醒和获取数据
当有新数据被放入队列时,put()
方法会更新信号量或条件变量,通知阻塞的 get()
操作可以继续执行。被唤醒的 get()
操作将重新获取锁,检查队列并取出数据。
e. 释放锁
无论是成功获取数据还是由于其他原因退出,get()
方法最后都会释放锁,以允许其他进程进行队列操作。
经典问题:为什么用while,而不是if
这是一个多线程并发编程中经常会遇到的问题,尤其是在等待条件时,比如经典的生产者-消费者模型,很多人会问:
为什么在等待条件时,常常写成
while not 条件: condition.wait()
,而不是if not 条件: condition.wait()
?
核心原因:防止虚假唤醒(Spurious Wakeup)和竞态条件
1. 虚假唤醒(Spurious Wakeup)
- 在多线程环境下,有些操作系统或线程库实现允许线程在没有被显式通知的情况下从
wait()
返回,这叫做“虚假唤醒”。 - 如果你用
if
,线程醒来后不会再次检查条件,可能条件并没有真正满足,结果导致程序逻辑错误。 while
会在每次醒来后重新检查条件,确保只有在真正条件满足时才继续执行。
2. 竞争条件(Race Condition)
- 在多线程环境中,可能多个线程被同时唤醒(例如
notify_all()
唤醒多个线程),这些线程会争夺资源,只有一个能成功拿到,其余又要继续等待。 - 如果用
if
,没抢到资源的线程不会再次判断条件,会直接往下执行,出错。 while
可以让没抢到资源的线程再次进入等待,直到条件真正满足。
一句话记忆
多线程等待条件时,用while不是if,因为醒来后必须重新检查条件,防止虚假唤醒和数据竞争。
7. 常见误区与调试技巧
- 避免在
__main__
之外创建进程:在 Windows 上,必须将创建进程的代码放在if __name__ == '__main__'
块中,否则会导致递归创建进程。 - 使用
if __name__ == '__main__'
保护主程序:确保子进程不会再次执行主程序代码。 - 注意全局变量:进程之间不共享全局变量,需要使用进程间通信机制传递数据。
- 调试多进程程序:可以使用
print
语句、日志记录或调试工具,但要注意进程间输出的顺序。
3、Spinner with Coroutines
1. 协程驱动的 Spinner 程序解析
1.1 程序结构
我们以 spinner_async.py
为例,探讨如何使用协程实现一个简单的 Spinner 程序,该程序在后台显示旋转指示器,同时执行耗时操作。
import asyncio
import itertoolsdef main() -> None:result = asyncio.run(supervisor())print(f'Answer: {result}')async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print(f'spinner object: {spinner}')result = await slow()spinner.cancel()return resultasync def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')async def slow() -> int:await asyncio.sleep(3)return 42if __name__ == '__main__':main()
在这个例子中,任务调度是由 asyncio
的事件循环负责的。当一个任务(协程)在执行过程中遇到 await
关键字(如 await asyncio.sleep()
),它会将控制权交还给事件循环,让事件循环调度其他准备好执行的任务。这种机制让多个任务可以并发运行,而不会因为等待某个任务的完成而阻塞整个程序。
具体调度过程
-
事件循环:
- 当程序运行时,
asyncio.run(supervisor())
启动了事件循环。 - 事件循环负责管理和调度所有由
asyncio.create_task()
创建的任务。
- 当程序运行时,
-
任务创建与调度:
- 在
supervisor
协程中,asyncio.create_task(spin('thinking!'))
创建了一个新的任务spin
。 - 事件循环立即开始调度
spin
的执行,因为它是通过create_task
显式创建的并且不依赖其他任务完成。
- 在
-
遇到
await
关键字:- 在
spin
函数中,await asyncio.sleep(.1)
暂停了spin
的执行,将控制权交还给事件循环。 - 事件循环检查其他任务,发现
supervisor
仍在等待slow
的结果,而slow
也在await asyncio.sleep(3)
上暂停。 - 因为
spin
的暂停时间很短(0.1 秒),所以事件循环会频繁地重新调度spin
,显示旋转动画。
- 在
-
并发执行:
- 由于
spin
和slow
都在使用await
进行异步等待,事件循环能够在它们各自的等待期间切换执行。 - 这意味着即使
slow
在等待 3 秒,spin
仍能继续执行,提供视觉反馈。
- 由于
-
任务取消:
- 一旦
slow
完成并返回结果,supervisor
立即调用spinner.cancel()
来取消spin
任务。 - 当
spin
检测到取消请求时,通过捕获asyncio.CancelledError
来终止动画循环。
- 一旦
总结
- 调度角色: 是
asyncio
的事件循环负责调度任务。它在任务遇到await
时自动切换到其他准备好执行的任务。所以await后面跟可等待对象(协程对象、Future、Task对象),他们都可能发生I/O等待 - 自动切换: 任务在遇到
await asyncio.sleep()
或其他 I/O 操作时,会自动将控制权交还给事件循环,允许事件循环安排其他任务的执行。 - 并发执行: 通过这种调度机制,多个任务可以并发运行,提高程序的响应性和效率,特别是在 I/O 密集型操作中。
关于await
在 Python 的异步编程中,await
关键字用于暂停协程的执行,直到提供的可等待对象完成。可等待对象可以是协程对象、Future
、或 Task
。当代码执行到 await
时,会发生以下事情:
-
暂停当前协程:
- 当协程执行到
await
语句时,它会暂停执行,将控制权交还给事件循环。 - 当前协程的状态被保存,以便在可等待对象完成后可以恢复执行。
- 当协程执行到
-
返回控制权给事件循环:
await
后的操作通常是 I/O 操作或其他耗时任务。在这些操作进行时,协程会将控制权交还给事件循环。- 事件循环可以利用这个时间去调度其他可运行的任务。这种机制允许其他协程在当前协程等待时执行,从而实现并发。
-
事件循环调度其他任务:
- 事件循环会检查所有已注册的任务,并寻找可以运行的任务(即那些不在等待 I/O 的任务)。
- 如果有其他任务准备好执行,事件循环会调度它们运行。
-
等待的任务完成后:
- 一旦
await
后的可等待对象完成(例如,一个网络请求完成,一个定时器到期,或一个文件被读取完毕),事件循环会将控制权返回给之前暂停的协程。 - 协程从上次暂停的位置恢复执行。
- 一旦
-
恢复执行:
- 在可等待对象完成后,协程会继续执行
await
之后的代码。 - 这可能涉及处理从
await
表达式返回的结果。
- 在可等待对象完成后,协程会继续执行
代码示例中的调度过程
以你的代码为例,来看 await
的具体调度过程:
async def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:# 这里的 await asyncio.sleep(.1) 将暂停 spin 协程await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')async def slow() -> int:# 这里的 await asyncio.sleep(3) 将暂停 slow 协程await asyncio.sleep(3)return 42
- 在
spin
中,遇到await asyncio.sleep(.1)
时,spin
协程暂停,让出控制权。 - 事件循环会检查其他任务,发现
slow
也在等待(await asyncio.sleep(3)
)。 - 因为
spin
的等待时间很短,事件循环在其他任务没有准备好执行时,很快会回到spin
。 - 继续执行
spin
,直到slow
完成 3 秒的等待并返回结果。 supervisor
协程恢复,取消spin
,打印结果。
这种机制使得 Python 的异步编程可以高效地处理大量 I/O 操作,而不会因为等待而阻塞整个程序。
为什么输出是这样的?
这个代码的输出为什么一直是先输出spinner object: <Task pending name=‘Task-2’ coro=<spin() running at /fluent/test.py:15>>
然后再是动画,最后是Answer: 42 ?
在Python的asyncio
中,创建一个任务(如使用asyncio.create_task()
)会安排该协程在事件循环中运行,但不会立即执行该协程的代码。相反,它会继续执行当前的同步代码,直到遇到await
或其他需要切换上下文的点。
具体到你的代码:
-
创建任务:
spinner = asyncio.create_task(spin('thinking!'))
这行代码创建了一个任务对象
spinner
,并安排spin
协程在事件循环中运行。此时它只是在事件循环中注册了spin
,并不会立即执行spin
的代码。 -
打印任务对象信息:
print(f'spinner object: {spinner}')
创建任务后,代码继续执行这一行。这是一个普通的同步操作,所以会立即被执行,打印出任务对象的信息。
-
执行
spin
和slow
:
接下来是:result = await slow()
这行代码会暂停
supervisor
协程的执行,直到slow()
完成。在这个暂停期间,事件循环会有机会调度并执行其他任务,包括spin
任务。
正是因为事件循环的这种调度机制,创建任务后,控制权会立即返回到当前的同步代码块,使得创建任务后的代码(如打印任务对象信息)会在任务的协程代码实际开始执行之前被执行。
1.2 关键概念解析
-
事件循环(Event Loop):
- 协程由事件循环驱动,事件循环负责管理协程队列、调度协程执行、监控 I/O 操作事件,并在事件发生时将控制权交还给相应的协程。
- 事件循环和所有协程都在单线程内执行,因此,任何一个协程的阻塞都会导致事件循环变慢,进而影响其他协程的执行。
-
asyncio.run()
函数:- 用于启动事件循环,并驱动传入的协程对象(通常是程序的入口点,例如本例中的
supervisor
)。 - 该函数会阻塞,直到传入的协程执行完毕,并返回协程的返回值。
- 用于启动事件循环,并驱动传入的协程对象(通常是程序的入口点,例如本例中的
-
原生协程(Native Coroutines):
- 使用
async def
关键字定义,例如supervisor
、spin
和slow
。 - 原生协程需要通过事件循环来驱动执行。
- 使用
-
asyncio.create_task()
函数:- 从协程内部调用,用于调度另一个协程的执行。
- 该函数会立即返回一个
asyncio.Task
对象,该对象包装了协程对象,并提供了控制和管理协程状态的方法。 - 例如,
spinner = asyncio.create_task(spin('thinking!'))
创建了一个用于执行spin
协程的任务,并将其赋值给spinner
变量。
-
await
关键字:- 从协程内部调用,用于将控制权转移给另一个协程。
- 当前协程会被挂起,直到被
await
的协程执行完毕。 - 例如,
result = await slow()
会挂起supervisor
协程,直到slow
协程执行完毕,并将slow
的返回值赋给result
。
-
协程的运行方式:
asyncio.run(coro())
:从普通函数调用,用于驱动协程对象(通常是程序的入口点)。该调用会阻塞,直到协程体返回。asyncio.create_task(coro())
:从协程内部调用,用于调度另一个协程的执行。该调用不会挂起当前协程,并返回一个Task
实例。await coro()
:从协程内部调用,用于将控制权转移给被await
的协程。该调用会挂起当前协程,直到被await
的协程返回。
-
协程的取消:
- 调用
Task.cancel()
方法会向协程抛出asyncio.CancelledError
异常,协程可以通过捕获该异常来执行清理操作并退出。
- 调用
1.3 关键函数解析
-
spin
协程:- 使用
itertools.cycle
循环显示旋转指示器(\|/-
)。 - 使用
await asyncio.sleep(0.1)
暂停 0.1 秒,避免阻塞事件循环。 - 捕获
asyncio.CancelledError
异常以优雅地退出循环。
- 使用
-
slow
协程:- 使用
await asyncio.sleep(3)
模拟耗时操作,并返回结果42
。 - 同样使用
await asyncio.sleep
而不是time.sleep
,以避免阻塞事件循环。
- 使用
1.4 重要实验:理解协程阻塞的影响
为了更好地理解协程的工作原理,我们进行以下实验:
-
实验内容:将
slow
协程中的await asyncio.sleep(3)
替换为time.sleep(3)
。 -
预期结果:
- 程序显示 spinner 对象,例如
<Task pending name='Task-2' coro=<spin() running at /path/to/spinner_async.py:12>>
。 - Spinner 不会显示,程序挂起 3 秒。
- 显示
Answer: 42
并结束程序。
- 程序显示 spinner 对象,例如
-
原因分析:
time.sleep(3)
会阻塞主线程,而主线程也是事件循环所在的线程。- 因此,在
time.sleep(3)
期间,事件循环无法执行任何其他协程,包括spin
协程。 - 这导致 Spinner 无法显示,程序整体被阻塞。
-
关键结论:
- 永远不要在 asyncio 协程中使用
time.sleep(…)
,除非你希望暂停整个程序。 - 如果协程需要等待一段时间,应该使用
await asyncio.sleep(DELAY)
,这会将控制权交还给事件循环,允许其他挂起的协程执行。
- 永远不要在 asyncio 协程中使用
2. 协程与绿色线程(Greenlets)
-
绿色线程(Greenlets):
- 绿色线程是一种轻量级的协程实现,由
greenlet
包提供。 - 不需要使用
yield
或await
等特殊语法,更易于集成到现有的顺序代码库中。 - 例如,SQL Alchemy 1.4 ORM 使用绿色线程来实现其与 asyncio 兼容的新异步 API。
- 绿色线程是一种轻量级的协程实现,由
-
gevent 库:
gevent
是一个基于绿色线程的网络库,通过“猴子补丁(monkey patching)”的方式,将 Python 的标准 socket 模块替换为非阻塞版本。- 对周围代码具有高度透明性,使得将顺序应用程序和库(如数据库驱动程序)转换为执行并发网络 I/O 变得更加容易。
- 许多开源项目使用
gevent
,包括广泛部署的 Gunicorn。
3. 总结
- 协程 是一种强大的并发编程工具,在单线程内实现高效的 I/O 操作处理。
- 事件循环 是协程的核心,负责调度和管理协程的执行。
- 避免使用
time.sleep()
,而应使用await asyncio.sleep()
,以确保事件循环的流畅运行。 - 绿色线程 和 gevent 提供了另一种异步编程的方式,适用于不同的应用场景。
4、Supervisors Side-by-Side
spinner_thread.py
和spinner_async.py
的代码行数几乎相同。supervisor
函数是这些示例的核心。下面我们来详细比较一下它们。示例19 - 8列出了示例19 - 2中的supervisor
函数。
示例19 - 8 spinner_thread.py:使用线程的supervisor
函数
def supervisor() -> int:done = Event()spinner = Thread(target=spin, args=('thinking!', done))print('spinner object:', spinner)spinner.start()result = slow()done.set()spinner.join()return result
作为对比,示例19 - 9展示了示例19 - 4中的supervisor
协程。
示例19 - 9 spinner_async.py:异步的supervisor
协程
async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print('spinner object:', spinner)result = await slow()spinner.cancel()return result
下面总结这两个supervisor
实现之间的异同点:
asyncio.Task
大致相当于threading.Thread
。
示例对比:
使用线程的示例 (thread_example.py
):
import threading
import timedef task1():for i in range(5):print(f"Thread 1: {i}")time.sleep(1)def task2():for i in range(5):print(f"Thread 2: {i}")time.sleep(1.5)def run_threads():thread1 = threading.Thread(target=task1)thread2 = threading.Thread(target=task2)thread1.start()thread2.start()thread1.join()thread2.join()print("Both threads have finished.")if __name__ == "__main__":run_threads()
使用asyncio
的示例 (asyncio_example.py
):
import asyncioasync def task1():for i in range(5):print(f"Task 1: {i}")await asyncio.sleep(1)async def task2():for i in range(5):print(f"Task 2: {i}")await asyncio.sleep(1.5)async def run_tasks():task1_coroutine = asyncio.create_task(task1())task2_coroutine = asyncio.create_task(task2())await task1_coroutineawait task2_coroutineprint("Both tasks have finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解释:
- 并发执行: 在线程示例中,
task1
和task2
在不同的线程中并行执行。在asyncio
示例中,task1
和task2
作为协程在同一个线程中并发执行,通过事件循环调度。 - 任务调度: 线程由操作系统调度,而
asyncio
任务由事件循环调度。
Task
驱动一个协程对象,而Thread
调用一个可调用对象。
示例对比:
使用线程的示例 (thread_callable.py
):
import threadingclass MyTask:def run(self):print("Thread is running a callable object.")def run_thread():obj = MyTask()thread = threading.Thread(target=obj.run)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_coroutine.py
):
import asyncioclass MyCoroutine:async def run(self):print("Coroutine is running.")async def run_coroutine():obj = MyCoroutine()await obj.run()print("Coroutine has finished.")if __name__ == "__main__":asyncio.run(run_coroutine())
解释:
- 调用方式: 在线程示例中,
Thread
调用了一个类的run
方法,这是一个可调用对象。在asyncio
示例中,asyncio
直接调用了一个协程方法run
,这是因为asyncio
只能调度协程。
- 协程通过
await
关键字显式地让出控制权。
示例对比:
使用线程的示例 (thread_sleep.py
):
import threading
import timedef task():for i in range(5):print(f"Thread: {i}")time.sleep(1)def run_thread():thread = threading.Thread(target=task)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_await.py
):
import asyncioasync def task():for i in range(5):print(f"Coroutine: {i}")await asyncio.sleep(1)async def run_coroutine():await task()print("Coroutine has finished.")if __name__ == "__main__":asyncio.run(run_coroutine())
解释:
- 控制权让渡: 在线程示例中,
time.sleep(1)
会阻塞当前线程。在asyncio
示例中,await asyncio.sleep(1)
会暂停当前协程的执行,让事件循环调度其他协程执行。
- 你不能自己实例化
Task
对象,而是通过将一个协程传递给asyncio.create_task(…)
来获取它们。
示例对比:
使用线程的示例 (thread_instantiate.py
):
import threadingdef task():print("Thread is running.")def run_thread():thread = threading.Thread(target=task)thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_create_task.py
):
import asyncioasync def task():print("Task is running.")async def run_tasks():task_coroutine = asyncio.create_task(task())await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解释:
- 任务创建: 在线程示例中,
Thread
对象是通过threading.Thread
直接实例化的。在asyncio
示例中,Task
对象是通过asyncio.create_task
创建的,而不是通过实例化Task
类。
- 当
asyncio.create_task(…)
返回一个Task
对象时,它已经被安排好要运行了,但Thread
实例必须通过调用其start
方法来显式地启动运行。
示例对比:
使用线程的示例 (thread_start.py
):
import threadingdef task():print("Thread is running.")def run_thread():thread = threading.Thread(target=task)# 线程尚未启动# thread.start() 必须调用才能启动线程thread.start()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_schedule.py
):
import asyncioasync def task():print("Task is running.")async def run_tasks():task_coroutine = asyncio.create_task(task())# Task 已经被安排运行,无需显式启动await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解释:
- 任务启动: 在线程示例中,必须调用
thread.start()
才能启动线程。在asyncio
示例中,asyncio.create_task
会立即安排协程执行,无需显式启动。
- 在使用线程的
supervisor
中,slow
是一个普通函数,由主线程直接调用。在异步的supervisor
中,slow
是一个由await
驱动的协程。
示例对比:
使用线程的示例 (thread_slow.py
):
import threading
import timedef slow():print("Slow function is running.")time.sleep(2)print("Slow function is done.")return 42def supervisor():thread = threading.Thread(target=slow)thread.start()thread.join()print("Supervisor has finished.")if __name__ == "__main__":supervisor()
使用asyncio
的示例 (asyncio_slow.py
):
import asyncioasync def slow():print("Slow coroutine is running.")await asyncio.sleep(2)print("Slow coroutine is done.")return 42async def supervisor():task = asyncio.create_task(slow())await taskprint("Supervisor has finished.")if __name__ == "__main__":asyncio.run(supervisor())
解释:
- 函数调用: 在线程示例中,
slow
是一个普通函数,由主线程直接调用并在新线程中执行。在asyncio
示例中,slow
是一个协程,通过await
调用。
- 没有从外部终止线程的API;相反,你必须发送一个信号,比如设置
done
这个Event
对象。对于任务,有Task.cancel()
实例方法,它会在协程体当前暂停的await
表达式处引发CancelledError
异常。
示例对比:
使用线程的示例 (thread_cancel.py
):
import threading
import time
from threading import Eventdef task(done):while not done.is_set():print("Thread is running.")time.sleep(1)print("Thread has been cancelled.")def run_thread():done = Event()thread = threading.Thread(target=task, args=(done,))thread.start()time.sleep(3)done.set()thread.join()print("Thread has finished.")if __name__ == "__main__":run_thread()
使用asyncio
的示例 (asyncio_cancel.py
):
import asyncioasync def task():try:while True:print("Task is running.")await asyncio.sleep(1)except asyncio.CancelledError:print("Task has been cancelled.")async def run_tasks():task_coroutine = asyncio.create_task(task())await asyncio.sleep(3)task_coroutine.cancel()await task_coroutineprint("Task has finished.")if __name__ == "__main__":asyncio.run(run_tasks())
解释:
- 取消机制: 在线程示例中,通过设置
Event
对象来通知线程停止。在asyncio
示例中,使用task.cancel()
来取消任务,协程会在await asyncio.sleep(1)
处捕获CancelledError
异常。
supervisor
协程必须在main
函数中通过asyncio.run
来启动。
示例对比:
使用线程的示例 (thread_main.py
):
import threading
import timedef task():print("Thread is running.")time.sleep(2)print("Thread is done.")def supervisor():thread = threading.Thread(target=task)thread.start()thread.join()print("Supervisor has finished.")if __name__ == "__main__":supervisor()
使用asyncio
的示例 (asyncio_main.py
):
import asyncioasync def task():print("Task is running.")await asyncio.sleep(2)print("Task is done.")async def supervisor():await task()print("Supervisor has finished.")if __name__ == "__main__":asyncio.run(supervisor())
解释:
- 程序入口: 在线程示例中,
supervisor
函数作为主函数直接调用。在asyncio
示例中,supervisor
协程通过asyncio.run
启动。
总结
通过这些具体的对比示例,我们可以看到threading.Thread
和asyncio.Task
在并发执行、调用方式、控制权让渡、任务创建与启动、取消机制以及程序入口等方面的不同。这些差异使得asyncio
在处理高并发和I/O密集型任务时更加高效。
关于线程和协程的最后一点:如果你用线程进行过任何复杂的编程,就会知道理解程序的逻辑有多困难,因为调度器可以随时中断线程。你必须记住持有锁来保护程序的关键部分,以避免在多步骤操作过程中被中断,否则可能会使数据处于无效状态。
而对于协程,默认情况下你的代码不会被中断。你必须显式地使用await
让程序的其他部分运行。协程不是通过持有锁来同步多个线程的操作,而是从定义上就是 “同步” 的:任何时刻只有一个协程在运行。当你想放弃控制权时,使用await
将控制权交回调度器。这就是为什么可以安全地取消一个协程:从定义上讲,协程只有在await
表达式处暂停时才能被取消,所以你可以通过处理CancelledError
异常来进行清理工作。
time.sleep()
调用会阻塞但不做任何事情。现在我们将通过一个CPU密集型的调用来进行实验,以便更好地理解全局解释器锁(GIL),以及CPU密集型函数在异步代码中的影响。
三、The Real Impact of the GIL
——基于多进程、多线程、异步编程的对比分析
1、核心问题:GIL 对 CPU 密集型任务的影响
GIL(全局解释器锁) 是 CPython 解释器的设计特性,它确保同一时刻仅有一个线程执行 Python 字节码。这对 CPU 密集型任务有显著影响:
- I/O 操作:等待网络/磁盘时释放 GIL,允许其他线程运行(如
time.sleep(3)
或 HTTP 请求)。 - CPU 密集型操作:长时间占用 CPU 的函数会阻塞其他线程(如素数计算)。
2、原始例子:素数判断函数
import mathdef is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 仅检查奇数if n % i == 0:return Falsereturn True
- 测试
is_prime(5_000_111_000_222_021)
耗时约 3.3 秒(CPU 密集型)。
3、三大并发模型的行为对比
假设将以下代码中的 time.sleep(3)
或 asyncio.sleep(3)
替换为 is_prime(n)
:
-
多进程 (
spinner_proc.py
)- 结果:旋转动画持续运行。
- 原理:子进程负责动画,父进程计算素数。进程间内存隔离,GIL 不共享。
# 伪代码示例 from multiprocessing import Process, Eventdef spin():while not done_event.is_set():# 更新动画...if __name__ == "__main__":done_event = Event()p = Process(target=spin) # 子进程运行动画p.start()is_prime(n) # 主进程计算素数done_event.set()
-
多线程 (
spinner_thread.py
)- 结果:动画持续运行(但可能卡顿)。
- 原理:
- Python 默认每 5ms 强制切换线程(通过
sys.setswitchinterval
设置)。 - 主线程计算素数时,每 5ms 被中断一次,动画线程获得 GIL 更新状态。
- Python 默认每 5ms 强制切换线程(通过
- 陷阱:若线程数 > CPU 核心数,程序效率会显著下降!
# 伪代码示例 import threadingdef spin():while not done_event.is_set():# 更新动画(短暂占用 GIL)...done_event = threading.Event() t = threading.Thread(target=spin) t.start() is_prime(n) # 主线程计算素数 done_event.set()
-
异步编程 (
spinner_async.py
)- 结果:动画完全冻结!
- 原理:
- 异步任务在单线程中运行,
is_prime
阻塞事件循环。 - 动画任务 (
spinner
) 从未获得执行机会。
- 异步任务在单线程中运行,
# 伪代码示例 import asyncioasync def spin():while not done:# 更新动画await asyncio.sleep(0.1)async def slow():is_prime(n) # 阻塞整个事件循环!return 42async def supervisor():spinner_task = asyncio.create_task(spin())result = await slow()spinner_task.cancel()return result
关键难点与解决方案
1. 异步编程中运行 CPU 密集型任务
问题:直接调用 is_prime
会阻塞事件循环。
方案一(不推荐):插入 await asyncio.sleep(0)
async def is_prime_async(n):for i in range(3, root + 1, 2):if n % i == 0:return Falseif i % 100_000 == 1: # 每 10 万次迭代让步一次await asyncio.sleep(0) # 让出控制权return True
- 缺点:计算时间从 3.3s 增至 4.9s(效率下降 50%),且事件循环仍被拖慢。
方案二(推荐):使用 run_in_executor
移交到线程池
async def slow():loop = asyncio.get_running_loop()result = await loop.run_in_executor(None, is_prime, n) # 在子线程中运行return result
这种情况下的is_prime函数要是非async的,如果是async会报错:RuntimeWarning: Enable tracemalloc to get the object allocation
loop.run_in_executor
方法用于在不同的线程中运行一个阻塞的同步函数,而不是一个协程。因此,这个问题导致了 is_prime
协程没有被正确地等待(await),从而引发了 RuntimeWarning
。
所以需要将 is_prime
改为同步函数:如果 is_prime
函数本身不需要异步执行,你可以将它定义为一个普通的同步函数(去掉 async
关键字)。这样可以直接使用 loop.run_in_executor
来调用它。
完整代码
import mathdef is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 仅检查奇数if n % i == 0:return Falsereturn Trueimport asyncio
import itertoolsasync def slow():loop = asyncio.get_running_loop()result = await loop.run_in_executor(None, is_prime, 5_000_111_000_222_021) # 在子线程中运行return resultdef main() -> None:result = asyncio.run(supervisor())print(f'Answer: {result}')async def supervisor() -> int:spinner = asyncio.create_task(spin('thinking!'))print(f'spinner object: {spinner}')result = await slow()spinner.cancel()return resultasync def spin(msg: str) -> None:for char in itertools.cycle(r'\|/-'):status = f'\r{char} {msg}'print(status, flush=True, end='')try:await asyncio.sleep(.1)except asyncio.CancelledError:breakblanks = ' ' * len(status)print(f'\r{blanks}\r', end='')if __name__ == '__main__':main()
- 原理:将阻塞函数转移到线程池执行,避免阻塞事件循环。
2. 多线程的适用场景
- 适合:I/O 密集型任务(如 HTTP 请求、文件读写)。
- 避免:CPU 密集型任务(除非任务能频繁释放 GIL,如 NumPy/C 扩展)。
- 对比实验:
# 两个 CPU 密集型线程 vs 顺序执行
import threading
import time
import mathn = 5_000_111_000_222_021def is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2): # 仅检查奇数if n % i == 0:return Falsereturn Truedef task():for _ in range(2): # 并行执行两次is_prime(n)# 顺序执行(更快!)
start = time.time()
is_prime(n)
is_prime(n)
print("Sequential:", time.time() - start) # 多线程执行(更慢!)
t1 = threading.Thread(target=is_prime, args=(n,))
t2 = threading.Thread(target=is_prime, args=(n,))
t1.start(); t2.start()
t1.join(); t2.join()
print("Threads:", time.time() - start) # Sequential: 2.7513389587402344
# Threads: 5.548290014266968
工程实践总结
场景 | 推荐模型 | 原因 |
---|---|---|
I/O 密集型(网络/磁盘) | 异步编程 (asyncio ) | 高并发、低开销 |
CPU 密集型 | 多进程 (multiprocessing ) | 绕过 GIL,利用多核 CPU |
混合任务 | 异步 + 线程池 | I/O 用 asyncio ,CPU 用 run_in_executor |
黄金法则:
- “I/O 用异步,CPU 用进程”
- 避免在异步事件循环中直接调用 CPU 密集型函数
四、A Homegrown Process Pool
本节是为了展示如何使用多个进程处理CPU密集型任务,以及使用队列来分配任务和收集结果的常见模式。第20章将介绍一种向进程分配任务的更简单方法:使用concurrent.futures
包中的ProcessPoolExecutor
,它内部使用了队列。
1. 问题背景与核心概念
1.1 问题描述
我们需要检测一组大数(20个从2到10¹⁶-1的整数)是否为素数。这是一个CPU密集型任务,因为判断大数是否为素数需要大量计算。
primes.py:
#!/usr/bin/env python3import mathPRIME_FIXTURE = [(2, True),(142702110479723, True),(299593572317531, True),(3333333333333301, True),(3333333333333333, False),(3333335652092209, False),(4444444444444423, True),(4444444444444444, False),(4444444488888889, False),(5555553133149889, False),(5555555555555503, True),(5555555555555555, False),(6666666666666666, False),(6666666666666719, True),(6666667141414921, False),(7777777536340681, False),(7777777777777753, True),(7777777777777777, False),(9999999999999917, True),(9999999999999999, False),
]NUMBERS = [n for n, _ in PRIME_FIXTURE]# tag::IS_PRIME[]
def is_prime(n: int) -> bool:if n < 2:return Falseif n == 2:return Trueif n % 2 == 0:return Falseroot = math.isqrt(n)for i in range(3, root + 1, 2):if n % i == 0:return Falsereturn True
# end::IS_PRIME[]if __name__ == '__main__':for n, prime in PRIME_FIXTURE:prime_res = is_prime(n)assert prime_res == primeprint(n, prime)
1.2 核心挑战
顺序执行(单进程)效率低下,如何利用多核CPU加速计算?
1.3 关键解决方案
使用 多进程 (multiprocessing) 和 任务队列 (Queue):
- 创建多个工作进程并行处理任务
- 使用队列分配任务和收集结果
- 避免GIL(全局解释器锁)对CPU密集型任务的限制
2. 顺序执行:性能基准
2.1 代码实现 (sequential.py
)
#!/usr/bin/env python3
from time import perf_counter
from typing import NamedTuple
from primes import is_prime, NUMBERSclass Result(NamedTuple):prime: boolelapsed: floatdef check(n: int) -> Result:t0 = perf_counter()prime = is_prime(n)return Result(prime, perf_counter() - t0)def main() -> None:print(f'Checking {len(NUMBERS)} numbers sequentially:')t0 = perf_counter()for n in NUMBERS:prime, elapsed = check(n)label = 'P' if prime else ' 'print(f'{n:16} {label} {elapsed:9.6f}s')elapsed = perf_counter() - t0print(f'Total time: {elapsed:.2f}s')if __name__ == '__main__':main()
2.2 运行结果
Checking 20 numbers sequentially:2 P 0.000000s142702110479723 P 0.247988s299593572317531 P 0.338454s
3333333333333301 P 1.098308s
3333333333333333 0.000014s
3333335652092209 1.093885s
4444444444444423 P 1.270619s
4444444444444444 0.000001s
4444444488888889 1.343093s
5555553133149889 1.531583s
5555555555555503 P 1.479670s
5555555555555555 0.000006s
6666666666666666 0.000000s
6666666666666719 P 1.803434s
6666667141414921 1.786589s
7777777536340681 1.739732s
7777777777777753 P 2.059888s
7777777777777777 0.000009s
9999999999999917 P 1.965799s
9999999999999999 0.000005s
Total time: 17.76s
2.3 关键观察
- 所有任务顺序执行
- 小数字检测快(微秒级),大素数检测慢
- 总时间≈各任务耗时之和
3. 多进程解决方案 (procs.py
)
3.1 核心代码解析
import sys
from time import perf_counter
from typing import NamedTuple
from multiprocessing import Process, SimpleQueue, cpu_count
from multiprocessing import queues
from primes import is_prime, NUMBERSclass PrimeResult(NamedTuple):n: int # 存储原始数字prime: bool # 是否为素数elapsed: float # 检测耗时# 类型别名提高可读性
JobQueue = queues.SimpleQueue[int]
ResultQueue = queues.SimpleQueue[PrimeResult]def check(n: int) -> PrimeResult:t0 = perf_counter()res = is_prime(n)return PrimeResult(n, res, perf_counter() - t0)def worker(jobs: JobQueue, results: ResultQueue) -> None:while n := jobs.get(): # 从队列获取任务results.put(check(n)) # 提交结果# 毒丸处理:发送终止信号results.put(PrimeResult(0, False, 0.0))def start_jobs(procs: int, jobs: JobQueue, results: ResultQueue) -> None:# 填充任务队列for n in NUMBERS:jobs.put(n)# 启动工作进程for _ in range(procs):proc = Process(target=worker, args=(jobs, results))proc.start()# 添加毒丸(每个进程一个)for _ in range(procs):jobs.put(0)def main() -> None:# 确定进程数(默认为CPU核心数)procs = cpu_count() if len(sys.argv) < 2 else int(sys.argv[1])print(f'Checking {len(NUMBERS)} numbers with {procs} processes:')t0 = perf_counter()# 创建队列jobs: JobQueue = SimpleQueue()results: ResultQueue = SimpleQueue()# 启动任务start_jobs(procs, jobs, results)# 处理结果checked = 0procs_done = 0while procs_done < procs:n, prime, elapsed = results.get()if n == 0: # 毒丸检测procs_done += 1else:checked += 1label = 'P' if prime else ' 'print(f'{n:16} {label} {elapsed:9.6f}s')elapsed = perf_counter() - t0print(f'{checked} checks in {elapsed:.2f}s')if __name__ == '__main__':main()
3.2 运行结果(8进程)
Checking 20 numbers with 8 processes:2 P 0.000001s
3333333333333333 0.000005s
4444444444444444 0.000001s142702110479723 P 0.419267s
5555555555555555 0.000004s
6666666666666666 0.000000s299593572317531 P 0.517399s
3333333333333301 P 1.804822s
3333335652092209 1.804431s
4444444444444423 P 1.953585s
7777777777777777 0.000005s
4444444488888889 2.084240s
9999999999999999 0.000007s
5555553133149889 2.250911s
5555555555555503 P 2.257135s
6666666666666719 P 2.442794s
6666667141414921 2.534752s
7777777777777753 P 2.199439s
7777777536340681 2.235224s
9999999999999917 P 2.372946s
20 checks in 4.40s
3.3 关键改进
- 并行处理:使用8个进程(对应8个物理CPU核心)
- 队列管理:
JobQueue
:分发待检测数字ResultQueue
:收集检测结果
- 毒丸模式 (Poison Pill):用特殊值(0)通知进程终止
- 结果关联:在结果中存储原始数字(
PrimeResult.n
)解决乱序问题
3.4 性能提升
方案 | 总耗时 | 加速比 |
---|---|---|
顺序执行 | 17.76s | 1x |
多进程(8) | 4.40s | 4.0x |
超线程的真相:虽然系统报告12个逻辑核心(6物理核心×2线程),但CPU密集型任务中实际性能接近物理核心数(6核心)
ps:上面的是原文的内容,但是这个是超线程(Hyper-Threading),是一种由英特尔开发的技术,用于提高处理器的并行处理能力。通过超线程,一个物理核心可以被虚拟化为两个逻辑核心,这使得操作系统和应用程序可以将其视为两个独立的核心进行任务分配。 mac的m芯片并没有这个超线程。
M2 芯片有 8 个核心,其中包括 4 个性能核心和 4 个能效核心。这意味着在进行 CPU 密集型任务时,你可以使用所有 8 个核心来处理任务。与超线程不同,M2 芯片的核心是物理核心,不是通过超线程技术增加的逻辑核心。因此,你可以在 CPU 密集型任务中充分利用这 8 个物理核心的性能。
在理想情况下,如果你有8个物理核心,并且任务可以完美地分割和并行执行,你可能期望接近8倍的加速。但实际还是比较难的
4. 关键技术与工程实践
4.1 进程间通信 (IPC)
问题:进程间内存隔离,不能直接共享数据
解决方案:使用队列(multiprocessing.Queue
)
# 创建队列
from multiprocessing import Queue
job_queue = Queue()
result_queue = Queue()# 生产者(主进程)
job_queue.put(task_data)# 消费者(工作进程)
task = job_queue.get()
result = process(task)
result_queue.put(result)
4.2 毒丸模式 (Poison Pill)
作用:优雅终止工作进程
实现技巧:
- 使用不会与正常数据冲突的哨兵值(0、None、Ellipsis等)
- 每个进程一个毒丸
# 主进程
for _ in range(num_workers):job_queue.put(POISON_PILL) # 发送终止信号# 工作进程
while True:task = job_queue.get()if task is POISON_PILL:break# 处理正常任务
4.3 结果乱序处理
问题:任务完成顺序 ≠ 提交顺序
解决方案:结果中携带原始数据标识
# 错误做法(丢失关联)
results.put(is_prime(n))# 正确做法(保留关联)
results.put((n, is_prime(n), elapsed_time))
4.4 进程数优化
最佳实践:进程数 ≈ 物理CPU核心数
图:不同进程数下的执行时间(6核CPU最佳)
5. 多线程方案的陷阱 (threads.py
)
5.1 为何线程无效
# 伪代码展示线程方案
from threading import Threaddef worker():while task := task_queue.get():# CPU密集型计算result = heavy_computation(task) # GIL在此阻塞其他线程result_queue.put(result)# 创建多个线程(实际无法并行执行CPU任务)
5.2 GIL (全局解释器锁) 的影响
- Python解释器一次只允许一个线程执行字节码
- CPU密集型任务无法充分利用多核
- 线程切换反而增加开销
5.3 实测性能
方案 | 总耗时 | 对比顺序执行 |
---|---|---|
顺序执行 | 40.31s | 基准 |
12线程 | 42.5s | 更慢 |
关键结论:线程适合I/O密集型任务,不适合CPU密集型任务
6. 补充实例:I/O密集型任务对比
6.1 模拟I/O任务
import time
import requests
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorURLS = ['https://www.example.com/page1','https://www.example.com/page2',# ... 20个URL
]def fetch(url):start = time.perf_counter()response = requests.get(url) # I/O阻塞操作elapsed = time.perf_counter() - startreturn len(response.content), elapsed
6.2 性能对比
方案 | 任务类型 | 20个任务总耗时 |
---|---|---|
顺序执行 | I/O | 12.7s |
多线程(20) | I/O | 1.3s |
多进程(20) | I/O | 3.8s |
6.3 关键结论
- I/O密集型:多线程最佳(避免进程创建开销)
- CPU密集型:多进程最佳(避开GIL限制)
- 混合型任务:需根据实际情况测试选择
7. 工程实践建议
7.1 避免"自造轮子"
使用标准库concurrent.futures
简化代码:
# 使用ProcessPoolExecutor重构procs.py
from concurrent.futures import ProcessPoolExecutordef main():with ProcessPoolExecutor(max_workers=12) as executor:futures = {executor.submit(is_prime, n): n for n in NUMBERS}for future in as_completed(futures):n = futures[future]prime = future.result()# 处理结果...
7.2 常见陷阱与解决方案
陷阱 | 解决方案 |
---|---|
忘记if __name__ == '__main__' | 始终使用守护模式 |
队列阻塞导致死锁 | 设置超时/使用非阻塞方法 |
进程资源泄漏 | 使用上下文管理器管理进程池 |
跨平台序列化问题 | 避免传递lambda/闭包函数 |
7.3 调试技巧
- 简化复现:减少进程数和任务量
- 日志记录:
import logging logging.basicConfig(level=logging.DEBUG,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
- 使用
ProcessPoolExecutor
替代手动管理进程
8. 总结:Python在多核世界的生存之道
-
CPU密集型任务:
- 使用多进程绕过GIL限制
- 优选
concurrent.futures.ProcessPoolExecutor
- 进程数≈物理CPU核心数
-
I/O密集型任务:
- 使用多线程减少上下文切换开销
- 优选
concurrent.futures.ThreadPoolExecutor
- 结合
asyncio
实现更高并发
-
混合任务优化:
- 分离CPU和I/O逻辑
- 使用进程池+线程池分层架构
- 考虑C扩展处理关键计算
-
分布式扩展:
- 对于超大规模任务
- 使用
Celery
或Dask
跨节点分发 - 结合云服务实现弹性扩展
关键洞见:Python通过多进程和异步编程模型,结合丰富的生态系统,能有效利用多核处理器应对各类计算需求。
总结
在介绍一些理论之后,本章展示了通过 Python 三种原生并发编程模型实现的 spinner 脚本:
- 基于
threading
包的线程模型 - 基于
multiprocessing
的进程模型 - 基于
asyncio
的异步协程模型
随后我们通过实验探究了全局解释器锁(GIL)的实际影响:修改 spinner 示例以计算大整数的素性,并观察其行为。实验直观表明,异步编程(asyncio)中必须避免 CPU 密集型函数,因为它们会阻塞事件循环。实验的线程版本虽然存在 GIL 限制,但仍能工作——这是因为 Python 会定期中断线程,且该示例仅使用两个线程:一个执行 CPU 密集型计算,另一个每秒仅驱动动画 10 次。多进程版本则绕开了 GIL,通过启动新进程专门处理动画,而主进程执行素性检查。
下一个计算多个素数的示例突出了多进程与线程的区别,证明只有进程能让 Python 充分利用多核 CPU。对于重计算任务,Python 的 GIL 会使线程性能比顺序代码更差。
GIL 是 Python 并发与并行计算讨论的核心,但我们不应高估其影响(如 725 页“Python 与多核世界”所述)。例如,GIL 对 Python 在系统管理中的许多用例并无影响。另一方面,数据科学和服务器端开发社区已通过针对特定需求的工业级解决方案绕开了 GIL 的限制。最后两节提到了支持 Python 服务器端应用规模化的两个常见组件:WSGI 应用服务器和分布式任务队列。