Python asyncio协程原理
协程是Python异步编程的核心,理解其原理对于编写高效并发程序至关重要。
协程基础概念
什么是协程
协程(Coroutine)是一种比线程更轻量的并发单元:
- 用户态调度:由程序自身控制切换,无需内核参与
- 协作式调度:协程主动让出控制权,而非抢占式
- 低开销:创建成本极低,可轻松创建百万级协程
生成器与协程的关系
协程源于生成器,通过yield实现暂停与恢复:
Python
# 生成器函数
def simple_generator():
yield 1
yield 2
yield 3
gen = simple_generator()
print(next(gen)) # 1
print(next(gen)) # 2
# 协程通过yield实现双向通信
def coroutine():
value = yield
print(f"Received: {value}")
coro = coroutine()
next(coro) # 预激协程
coro.send(10) # 发送值,输出: Received: 10
async/await语法
async def定义协程
Python
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 协程函数调用返回协程对象,不会立即执行
coro = hello()
# 运行协程
asyncio.run(hello())
await挂起协程
Python
async def fetch_data():
await asyncio.sleep(1) # 模拟IO操作
return "data"
async def main():
# await会挂起当前协程,让出控制权
result = await fetch_data()
print(result)
asyncio.run(main())
可等待对象
await只能用于可等待对象(Awaitable):
Python
import asyncio
from collections.abc import Awaitable
# 1. 协程
async def coro():
return "coroutine"
# 2. Task
async def task_example():
task = asyncio.create_task(coro())
result = await task
# 3. Future(低层API)
async def future_example():
future = asyncio.Future()
future.set_result("future result")
result = await future
事件循环机制
事件循环核心
事件循环是异步执行的引擎:
Python
import asyncio
# 获取当前事件循环
loop = asyncio.get_event_loop()
# 事件循环生命周期
async def main():
print("Running in event loop")
# asyncio.run做了以下事情:
# 1. 创建新事件循环
# 2. 运行main()直到完成
# 3. 关闭事件循环
asyncio.run(main())
事件循环内部流程
Python
# 伪代码展示事件循环核心逻辑
def event_loop():
ready_queue = deque()
sleeping_tasks = [] # 最小堆,按唤醒时间排序
readers = {} # {fd: callback}
writers = {} # {fd: callback}
while True:
# 1. 处理就绪队列
while ready_queue:
task = ready_queue.popleft()
run_task(task)
# 2. 处理IO事件(select/poll/epoll)
io_events = selector.select(timeout=next_sleep_time)
for fd, event in io_events:
if event & READ:
readers[fd]()
if event & WRITE:
writers[fd]()
# 3. 唤醒到期任务
now = time.time()
while sleeping_tasks and sleeping_tasks[0].wake_time <= now:
task = heapq.heappop(sleeping_tasks)
ready_queue.append(task)
任务调度示例
Python
async def task_a():
print("A start")
await asyncio.sleep(1)
print("A end")
async def task_b():
print("B start")
await asyncio.sleep(0.5)
print("B end")
async def main():
# 并发执行两个任务
await asyncio.gather(task_a(), task_b())
# 输出顺序:
# A start (立即)
# B start (立即,因为A让出了控制权)
# B end (0.5秒后)
# A end (1秒后)
asyncio.run(main())
协程调度原理
协程状态机
每个协程都有状态转换:
Python
CREATED → RUNNING → SUSPENDED → RUNNING → FINISHED
Python
import asyncio
import inspect
async def demo():
await asyncio.sleep(1)
return "done"
coro = demo()
print(inspect.getcoroutinestate(coro)) # CORO_CREATED
# 预激
next(coro.__await__()) # 或 send(None)
print(inspect.getcoroutinestate(coro)) # CORO_SUSPENDED
# 完成后
# CORO_CLOSED
Task封装
Python
async def coro():
await asyncio.sleep(1)
return "result"
# 直接创建Task
task = asyncio.create_task(coro())
# Task内部维护协程的状态
print(task.done()) # False
print(task.cancelled()) # False
print(task.get_name()) # Task-1
# 等待结果
result = await task
print(task.done()) # True
print(task.result()) # "result"
调度器实现
Python
async def scheduler_example():
# 简化的调度器示例
tasks = []
async def worker(n):
for i in range(3):
print(f"Worker {n}: step {i}")
await asyncio.sleep(0) # 让出控制权
# 创建多个任务
for i in range(3):
tasks.append(asyncio.create_task(worker(i)))
# 等待所有任务完成
await asyncio.gather(*tasks)
asyncio.run(scheduler_example())
底层实现原理
协程栈帧
Python
# 协程使用生成器实现
async def foo():
x = await bar()
return x
# 等价于生成器
def foo_gen():
# 保存栈帧状态
x = None
while True:
try:
if x is None:
x = yield from bar_gen()
return x
except Exception as e:
# 处理异常
raise
awaitable协议
Python
class MyAwaitable:
def __await__(self):
return self._async_method()
async def _async_method(self):
await asyncio.sleep(0.1)
return "custom awaitable"
async def main():
result = await MyAwaitable()
print(result) # custom awaitable
协程上下文切换
Python
import asyncio
import contextvars
# 协程上下文变量
request_id = contextvars.ContextVar('request_id')
async def handler():
print(f"Request ID: {request_id.get()}")
async def main():
# 设置上下文
request_id.set("req-001")
# 协程继承上下文
await handler()
asyncio.run(main())
性能分析
协程vs线程对比
text
import asyncio
import threading
import time
# 线程版本
def thread_worker(n):
time.sleep(0.001) # 模拟IO
return n
def thread_version():
threads = []
for i in range(10000):
t = threading.Thread(target=thread_worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 协程版本
async def coro_worker(n):
await asyncio.sleep(0.001)
return n
async def coro_version():
tasks = [coro_worker(i) for i in range(10000)]
await asyncio.gather(*tasks)
# 协程版本内存占用约几十KB
# 线程版本内存占用约8MB/线程
要点总结
- 协程是用户态轻量级并发单元,通过yield/await实现暂停与恢复
- async/await是语法糖,底层基于生成器实现
- 事件循环通过IO多路复用实现高效调度,核心是任务队列+事件监听
- Task是协程的调度封装,提供取消、状态查询等功能
- 协程适用于IO密集型场景,可轻松创建百万级并发
存放路径:articles/PYTHON/专家/并发与异步高级/asyncio协程原理.md
📝 发现内容有误?点击此处直接编辑