Python异步任务调度
asyncio提供了多种任务调度方式,灵活运用可以高效管理并发任务。
Task基础
创建Task
Python
import asyncio
async def work(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished")
return f"{name} result"
async def main():
# 方式1:create_task(推荐)
task = asyncio.create_task(work("task1", 1))
# 方式2:ensure_future
task2 = asyncio.ensure_future(work("task2", 1))
# 等待任务完成
result = await task
print(result)
asyncio.run(main())
Task状态与控制
Python
async def task_control():
task = asyncio.create_task(work("demo", 2))
print(task.done()) # False
print(task.cancelled()) # False
print(task.get_name()) # Task-1
# 设置任务名称
task.set_name("MyTask")
# 等待结果
result = await task
print(task.done()) # True
print(task.result()) # "demo result"
任务取消
Python
async def cancel_example():
task = asyncio.create_task(work("cancelable", 10))
await asyncio.sleep(1)
task.cancel() # 发送取消请求
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
async def cancelable_work():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
print("Cleanup before cancel")
raise # 必须重新抛出或处理
gather并发调度
基本用法
Python
async def gather_basic():
# 并发执行多个协程
results = await asyncio.gather(
work("A", 1),
work("B", 2),
work("C", 1)
)
print(results) # ['A result', 'B result', 'C result']
异常处理
Python
async def failing_work(name):
if name == "B":
raise ValueError(f"{name} failed")
return f"{name} result"
async def gather_exception():
# 默认:第一个异常会传播,其他任务继续
try:
await asyncio.gather(
work("A", 1),
failing_work("B"),
work("C", 1)
)
except ValueError as e:
print(f"Caught: {e}")
async def gather_return_exceptions():
# return_exceptions=True:异常作为结果返回
results = await asyncio.gather(
work("A", 1),
failing_work("B"),
work("C", 1),
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f"Error: {r}")
else:
print(f"Success: {r}")
动态任务列表
Python
async def gather_dynamic():
tasks = [work(f"Task-{i}", i % 3 + 1) for i in range(10)]
results = await asyncio.gather(*tasks)
return results
wait调度
基本用法
Python
async def wait_basic():
tasks = [
asyncio.create_task(work(f"T{i}", i))
for i in range(5)
]
done, pending = await asyncio.wait(tasks)
print(f"Completed: {len(done)}")
for task in done:
print(task.result())
等待策略
Python
async def wait_strategies():
tasks = [
asyncio.create_task(work(f"T{i}", i + 1))
for i in range(5)
]
# FIRST_COMPLETED: 任意一个完成就返回
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# FIRST_EXCEPTION: 第一个异常就返回
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
# ALL_COMPLETED: 全部完成(默认)
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
超时控制
Python
async def wait_timeout():
tasks = [
asyncio.create_task(work(f"T{i}", i + 1))
for i in range(5)
]
try:
done, pending = await asyncio.wait(
tasks,
timeout=2.0 # 最多等待2秒
)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
except asyncio.TimeoutError:
print("Timeout!")
as_completed迭代
按完成顺序迭代
Python
async def as_completed_example():
tasks = [
work(f"T{i}", 5 - i) # T4最快完成
for i in range(5)
]
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"Got result: {result}")
# 输出顺序:T4, T3, T2, T1, T0
带超时迭代
Python
async def as_completed_timeout():
tasks = [work(f"T{i}", i + 1) for i in range(5)]
for coro in asyncio.as_completed(tasks, timeout=3.0):
try:
result = await coro
print(result)
except asyncio.TimeoutError:
print("Iteration timeout")
break
TaskGroup (Python 3.11+)
基本用法
Python
async def taskgroup_example():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(work("A", 1))
task2 = tg.create_task(work("B", 2))
task3 = tg.create_task(work("C", 1))
# 退出上下文时所有任务已完成
print(task1.result(), task2.result(), task3.result())
异常处理
Python
async def taskgroup_exception():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(work("A", 1))
tg.create_task(failing_work("B")) # 会失败
tg.create_task(work("C", 1))
except ExceptionGroup as eg:
for exc in eg.exceptions:
print(f"Caught: {exc}")
wait_for超时等待
单任务超时
Python
async def waitfor_example():
try:
result = await asyncio.wait_for(
work("slow", 10),
timeout=2.0
)
except asyncio.TimeoutError:
print("Task timed out")
超时后取消任务
Python
async def waitfor_cancel():
task = asyncio.create_task(work("cancelable", 10))
try:
result = await asyncio.wait_for(task, timeout=1.0)
except asyncio.TimeoutError:
print("Timeout, task cancelled automatically")
# wait_for会自动取消任务
高级调度模式
竞争模式
Python
async def race():
# 等待第一个完成的结果
done, pending = await asyncio.wait(
[
asyncio.create_task(fetch_from_server1()),
asyncio.create_task(fetch_from_server2()),
],
return_when=asyncio.FIRST_COMPLETED
)
# 取消其他任务
for task in pending:
task.cancel()
# 获取第一个结果
result = done.pop().result()
return result
超时重试模式
Python
async def retry_with_timeout(coro, max_retries=3, timeout=5.0):
for attempt in range(max_retries):
try:
return await asyncio.wait_for(coro(), timeout=timeout)
except asyncio.TimeoutError:
if attempt == max_retries - 1:
raise
print(f"Retry {attempt + 1}/{max_retries}")
await asyncio.sleep(1)
限流调度
Python
async def rate_limited(tasks, rate_limit=10):
"限制每秒启动的任务数"
semaphore = asyncio.Semaphore(rate_limit)
async def limited_task(coro):
async with semaphore:
return await coro
return await asyncio.gather(
*[limited_task(task) for task in tasks]
)
批处理调度
Python
async def batch_process(items, batch_size=10):
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await asyncio.gather(
*[process_item(item) for item in batch]
)
results.extend(batch_results)
# 批次间休息
await asyncio.sleep(0.1)
return results
任务组合模式
链式任务
Python
async def chain_tasks():
# 顺序执行链
result1 = await step1()
result2 = await step2(result1)
result3 = await step3(result2)
return result3
# 并行链
async def parallel_chains():
results = await asyncio.gather(
chain_tasks(), # 链1
chain_tasks(), # 链2
chain_tasks(), # 链3
)
return results
条件分支
Python
async def conditional_execution():
task_a = asyncio.create_task(fetch_option_a())
task_b = asyncio.create_task(fetch_option_b())
done, _ = await asyncio.wait(
[task_a, task_b],
return_when=asyncio.FIRST_COMPLETED
)
first_result = done.pop().result()
if first_result['type'] == 'A':
# 取消另一个
task_b.cancel()
return await process_a(first_result)
else:
task_a.cancel()
return await process_b(first_result)
注意:gather适合等待所有结果,wait适合需要部分完成的场景,TaskGroup适合需要自动异常处理的场景。
要点总结
- Task是协程的调度封装,用
create_task创建,支持取消和状态查询 - gather并发执行多个协程,返回结果列表,
return_exceptions=True可捕获异常 - wait支持三种返回策略:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED
- as_completed按完成顺序迭代,适合流式处理
- TaskGroup(Python 3.11+)提供结构化并发,自动处理异常和取消
- 使用信号量、批处理控制并发粒度,避免资源耗尽
存放路径:articles/PYTHON/专家/并发与异步高级/异步任务调度.md
📝 发现内容有误?点击此处直接编辑