Python异步同步机制
asyncio提供了完整的同步原语,用于协调多个协程的执行顺序和资源共享。
异步锁 (Lock)
基本用法
Python
import asyncio
class AsyncCounter:
def __init__(self):
self.value = 0
self._lock = asyncio.Lock()
async def increment(self):
async with self._lock:
old = self.value
await asyncio.sleep(0) # 模拟IO
self.value = old + 1
async def race_condition():
counter = AsyncCounter()
# 无锁情况下可能丢失更新
tasks = [counter.increment() for _ in range(1000)]
await asyncio.gather(*tasks)
print(counter.value) # 正确:1000
asyncio.run(race_condition())
手动控制
Python
async def lock_manual():
lock = asyncio.Lock()
await lock.acquire()
try:
# 临界区代码
await do_something()
finally:
lock.release()
# 或使用上下文管理器
async with lock:
await do_something()
公平锁与非公平锁
Python
async def lock_ordering():
lock = asyncio.Lock()
order = []
async def worker(name):
await lock.acquire()
order.append(name)
await asyncio.sleep(0.1)
lock.release()
# asyncio.Lock是公平锁,按请求顺序获取
tasks = [
worker("A"),
worker("B"),
worker("C")
]
await asyncio.gather(*tasks)
print(order) # ['A', 'B', 'C']
事件 (Event)
基本用法
Python
async def event_example():
event = asyncio.Event()
async def waiter(name):
print(f"{name} waiting...")
await event.wait()
print(f"{name} received event!")
async def setter():
await asyncio.sleep(1)
print("Setting event!")
event.set()
await asyncio.gather(
waiter("W1"),
waiter("W2"),
setter()
)
等待与清除
Python
async def event_operations():
event = asyncio.Event()
# 检查是否已设置
print(event.is_set()) # False
# 设置事件
event.set()
print(event.is_set()) # True
# 清除事件
event.clear()
print(event.is_set()) # False
# 等待(会阻塞直到set)
await event.wait()
生产者-消费者模式
Python
async def producer_consumer():
data_ready = asyncio.Event()
data = None
async def producer():
nonlocal data
await asyncio.sleep(1)
data = "produced data"
data_ready.set()
async def consumer():
await data_ready.wait()
print(f"Got: {data}")
data_ready.clear()
await asyncio.gather(producer(), consumer())
条件变量 (Condition)
基本用法
Python
async def condition_example():
condition = asyncio.Condition()
shared_data = []
async def producer():
async with condition:
shared_data.append("item")
condition.notify() # 通知一个等待者
# condition.notify_all() # 通知所有等待者
async def consumer():
async with condition:
while not shared_data:
await condition.wait() # 释放锁并等待
item = shared_data.pop()
print(f"Consumed: {item}")
await asyncio.gather(producer(), consumer())
有界缓冲区
Python
class AsyncBoundedBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.not_full = asyncio.Condition()
self.not_empty = asyncio.Condition()
async def put(self, item):
async with self.not_full:
while len(self.buffer) >= self.capacity:
await self.not_full.wait()
self.buffer.append(item)
async with self.not_empty:
self.not_empty.notify()
async def get(self):
async with self.not_empty:
while not self.buffer:
await self.not_empty.wait()
item = self.buffer.pop(0)
async with self.not_full:
self.not_full.notify()
return item
信号量 (Semaphore)
基本用法
Python
async def semaphore_example():
sem = asyncio.Semaphore(3) # 最多3个并发
async def worker(n):
async with sem:
print(f"Worker {n} started")
await asyncio.sleep(1)
print(f"Worker {n} finished")
tasks = [worker(i) for i in range(10)]
await asyncio.gather(*tasks)
# 同时最多3个worker在执行
限制并发请求
Python
import aiohttp
async def limited_fetch(urls, max_concurrent=5):
sem = asyncio.Semaphore(max_concurrent)
async def fetch(url):
async with sem:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.text()
return await asyncio.gather(*[fetch(url) for url in urls])
有界信号量
Python
async def bounded_semaphore():
# 有界信号量:release超过初始值会抛出异常
sem = asyncio.BoundedSemaphore(2)
async with sem:
pass
# sem.release() # 如果已释放到最大值会报ValueError
异步队列 (Queue)
基本用法
Python
async def queue_example():
queue = asyncio.Queue()
async def producer():
for i in range(5):
await queue.put(f"item-{i}")
print(f"Produced: item-{i}")
await asyncio.sleep(0.1)
async def consumer():
for _ in range(5):
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()
await asyncio.gather(producer(), consumer())
优先级队列
Python
async def priority_queue():
queue = asyncio.PriorityQueue()
await queue.put((2, "low priority"))
await queue.put((1, "high priority"))
await queue.put((3, "lowest priority"))
while not queue.empty():
priority, item = await queue.get()
print(f"{priority}: {item}")
# 输出: 1: high priority, 2: low priority, 3: lowest priority
LIFO队列
Python
async def lifo_queue():
queue = asyncio.LifoQueue()
await queue.put("first")
await queue.put("second")
await queue.put("third")
while not queue.empty():
item = await queue.get()
print(item) # third, second, first
工作队列模式
Python
async def worker_pool():
queue = asyncio.Queue()
num_workers = 3
async def worker(name):
while True:
item = await queue.get()
try:
await process_item(item)
finally:
queue.task_done()
# 启动workers
workers = [
asyncio.create_task(worker(f"Worker-{i}"))
for i in range(num_workers)
]
# 添加任务
for i in range(10):
await queue.put(f"Task-{i}")
# 等待队列清空
await queue.join()
# 取消workers
for w in workers:
w.cancel()
队列超时操作
Python
async def queue_timeout():
queue = asyncio.Queue()
try:
# 等待获取,带超时
item = await asyncio.wait_for(queue.get(), timeout=1.0)
except asyncio.TimeoutError:
print("Queue get timeout")
# 非阻塞获取
try:
item = queue.get_nowait()
except asyncio.QueueEmpty:
print("Queue is empty")
# 非阻塞放入
try:
queue.put_nowait("item")
except asyncio.QueueFull:
print("Queue is full")
屏障 (Barrier)
Python 3.11+
Python
async def barrier_example():
barrier = asyncio.Barrier(3) # 需要3个协程同时到达
async def worker(name):
print(f"{name} phase 1")
await barrier.wait() # 等待所有worker完成phase 1
print(f"{name} phase 2")
await barrier.wait() # 等待所有worker完成phase 2
print(f"{name} phase 3")
await asyncio.gather(
worker("A"),
worker("B"),
worker("C")
)
同步原语对比
| 原语 | 用途 | 特点 |
|---|---|---|
| Lock | 互斥访问 | 只有一个协程可持有 |
| Event | 事件通知 | 一次通知多个等待者 |
| Condition | 条件等待 | 需配合Lock使用 |
| Semaphore | 并发限制 | 可多个协程同时持有 |
| Queue | 任务分发 | 生产者-消费者模式 |
| Barrier | 阶段同步 | 等待所有协程到达 |
死锁避免
死锁场景
Python
async def deadlock_example():
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def task1():
async with lock_a:
await asyncio.sleep(0.1)
async with lock_b: # 死锁!
pass
async def task2():
async with lock_b:
await asyncio.sleep(0.1)
async with lock_a: # 死锁!
pass
# 死锁发生
await asyncio.gather(task1(), task2())
避免死锁
Python
async def avoid_deadlock():
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def task1():
# 方案1:统一锁顺序
async with lock_a:
async with lock_b:
pass
async def task2():
async with lock_a: # 保持相同顺序
async with lock_b:
pass
# 方案2:使用超时
async def with_timeout():
try:
await asyncio.wait_for(task1(), timeout=1.0)
except asyncio.TimeoutError:
print("Potential deadlock detected")
注意:同步原语必须使用
async with或await,不能混用同步版本(threading.Lock等)。
要点总结
- Lock保护临界区,确保同一时刻只有一个协程访问共享资源
- Event用于事件通知,一次set唤醒所有等待者
- Condition用于复杂条件等待,需配合Lock使用
- Semaphore限制并发数量,适用于连接池、限流场景
- Queue实现生产者-消费者模式,支持PriorityQueue和LifoQueue
- 避免死锁:统一锁顺序、设置超时、减少锁持有时间
存放路径:articles/PYTHON/专家/并发与异步高级/异步同步机制.md
📝 发现内容有误?点击此处直接编辑