全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python异步任务调度

asyncio提供了多种任务调度方式,灵活运用可以高效管理并发任务。

Task基础

创建Task

Python
import asyncio

async def work(name, delay):
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished")
    return f"{name} result"

async def main():
    # 方式1:create_task(推荐)
    task = asyncio.create_task(work("task1", 1))

    # 方式2:ensure_future
    task2 = asyncio.ensure_future(work("task2", 1))

    # 等待任务完成
    result = await task
    print(result)

asyncio.run(main())

Task状态与控制

Python
async def task_control():
    task = asyncio.create_task(work("demo", 2))

    print(task.done())      # False
    print(task.cancelled()) # False
    print(task.get_name())  # Task-1

    # 设置任务名称
    task.set_name("MyTask")

    # 等待结果
    result = await task
    print(task.done())      # True
    print(task.result())    # "demo result"

任务取消

Python
async def cancel_example():
    task = asyncio.create_task(work("cancelable", 10))

    await asyncio.sleep(1)
    task.cancel()  # 发送取消请求

    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

async def cancelable_work():
    try:
        await asyncio.sleep(10)
    except asyncio.CancelledError:
        print("Cleanup before cancel")
        raise  # 必须重新抛出或处理

gather并发调度

基本用法

Python
async def gather_basic():
    # 并发执行多个协程
    results = await asyncio.gather(
        work("A", 1),
        work("B", 2),
        work("C", 1)
    )
    print(results)  # ['A result', 'B result', 'C result']

异常处理

Python
async def failing_work(name):
    if name == "B":
        raise ValueError(f"{name} failed")
    return f"{name} result"

async def gather_exception():
    # 默认:第一个异常会传播,其他任务继续
    try:
        await asyncio.gather(
            work("A", 1),
            failing_work("B"),
            work("C", 1)
        )
    except ValueError as e:
        print(f"Caught: {e}")

async def gather_return_exceptions():
    # return_exceptions=True:异常作为结果返回
    results = await asyncio.gather(
        work("A", 1),
        failing_work("B"),
        work("C", 1),
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f"Error: {r}")
        else:
            print(f"Success: {r}")

动态任务列表

Python
async def gather_dynamic():
    tasks = [work(f"Task-{i}", i % 3 + 1) for i in range(10)]
    results = await asyncio.gather(*tasks)
    return results

wait调度

基本用法

Python
async def wait_basic():
    tasks = [
        asyncio.create_task(work(f"T{i}", i))
        for i in range(5)
    ]

    done, pending = await asyncio.wait(tasks)

    print(f"Completed: {len(done)}")
    for task in done:
        print(task.result())

等待策略

Python
async def wait_strategies():
    tasks = [
        asyncio.create_task(work(f"T{i}", i + 1))
        for i in range(5)
    ]

    # FIRST_COMPLETED: 任意一个完成就返回
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )

    # FIRST_EXCEPTION: 第一个异常就返回
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_EXCEPTION
    )

    # ALL_COMPLETED: 全部完成(默认)
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )

超时控制

Python
async def wait_timeout():
    tasks = [
        asyncio.create_task(work(f"T{i}", i + 1))
        for i in range(5)
    ]

    try:
        done, pending = await asyncio.wait(
            tasks,
            timeout=2.0  # 最多等待2秒
        )
        print(f"Done: {len(done)}, Pending: {len(pending)}")

        # 取消未完成的任务
        for task in pending:
            task.cancel()
    except asyncio.TimeoutError:
        print("Timeout!")

as_completed迭代

按完成顺序迭代

Python
async def as_completed_example():
    tasks = [
        work(f"T{i}", 5 - i)  # T4最快完成
        for i in range(5)
    ]

    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"Got result: {result}")
    # 输出顺序:T4, T3, T2, T1, T0

带超时迭代

Python
async def as_completed_timeout():
    tasks = [work(f"T{i}", i + 1) for i in range(5)]

    for coro in asyncio.as_completed(tasks, timeout=3.0):
        try:
            result = await coro
            print(result)
        except asyncio.TimeoutError:
            print("Iteration timeout")
            break

TaskGroup (Python 3.11+)

基本用法

Python
async def taskgroup_example():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(work("A", 1))
        task2 = tg.create_task(work("B", 2))
        task3 = tg.create_task(work("C", 1))

    # 退出上下文时所有任务已完成
    print(task1.result(), task2.result(), task3.result())

异常处理

Python
async def taskgroup_exception():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(work("A", 1))
            tg.create_task(failing_work("B"))  # 会失败
            tg.create_task(work("C", 1))
    except ExceptionGroup as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

wait_for超时等待

单任务超时

Python
async def waitfor_example():
    try:
        result = await asyncio.wait_for(
            work("slow", 10),
            timeout=2.0
        )
    except asyncio.TimeoutError:
        print("Task timed out")

超时后取消任务

Python
async def waitfor_cancel():
    task = asyncio.create_task(work("cancelable", 10))

    try:
        result = await asyncio.wait_for(task, timeout=1.0)
    except asyncio.TimeoutError:
        print("Timeout, task cancelled automatically")
        # wait_for会自动取消任务

高级调度模式

竞争模式

Python
async def race():
    # 等待第一个完成的结果
    done, pending = await asyncio.wait(
        [
            asyncio.create_task(fetch_from_server1()),
            asyncio.create_task(fetch_from_server2()),
        ],
        return_when=asyncio.FIRST_COMPLETED
    )

    # 取消其他任务
    for task in pending:
        task.cancel()

    # 获取第一个结果
    result = done.pop().result()
    return result

超时重试模式

Python
async def retry_with_timeout(coro, max_retries=3, timeout=5.0):
    for attempt in range(max_retries):
        try:
            return await asyncio.wait_for(coro(), timeout=timeout)
        except asyncio.TimeoutError:
            if attempt == max_retries - 1:
                raise
            print(f"Retry {attempt + 1}/{max_retries}")
            await asyncio.sleep(1)

限流调度

Python
async def rate_limited(tasks, rate_limit=10):
    "限制每秒启动的任务数"
    semaphore = asyncio.Semaphore(rate_limit)

    async def limited_task(coro):
        async with semaphore:
            return await coro

    return await asyncio.gather(
        *[limited_task(task) for task in tasks]
    )

批处理调度

Python
async def batch_process(items, batch_size=10):
    results = []

    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        batch_results = await asyncio.gather(
            *[process_item(item) for item in batch]
        )
        results.extend(batch_results)

        # 批次间休息
        await asyncio.sleep(0.1)

    return results

任务组合模式

链式任务

Python
async def chain_tasks():
    # 顺序执行链
    result1 = await step1()
    result2 = await step2(result1)
    result3 = await step3(result2)
    return result3

# 并行链
async def parallel_chains():
    results = await asyncio.gather(
        chain_tasks(),  # 链1
        chain_tasks(),  # 链2
        chain_tasks(),  # 链3
    )
    return results

条件分支

Python
async def conditional_execution():
    task_a = asyncio.create_task(fetch_option_a())
    task_b = asyncio.create_task(fetch_option_b())

    done, _ = await asyncio.wait(
        [task_a, task_b],
        return_when=asyncio.FIRST_COMPLETED
    )

    first_result = done.pop().result()

    if first_result['type'] == 'A':
        # 取消另一个
        task_b.cancel()
        return await process_a(first_result)
    else:
        task_a.cancel()
        return await process_b(first_result)

注意:gather适合等待所有结果,wait适合需要部分完成的场景,TaskGroup适合需要自动异常处理的场景。

要点总结

  • Task是协程的调度封装,用create_task创建,支持取消和状态查询
  • gather并发执行多个协程,返回结果列表,return_exceptions=True可捕获异常
  • wait支持三种返回策略:FIRST_COMPLETED、FIRST_EXCEPTION、ALL_COMPLETED
  • as_completed按完成顺序迭代,适合流式处理
  • TaskGroup(Python 3.11+)提供结构化并发,自动处理异常和取消
  • 使用信号量、批处理控制并发粒度,避免资源耗尽

存放路径articles/PYTHON/专家/并发与异步高级/异步任务调度.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python异步IO实现
下一篇 → Python异步同步机制
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库