全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python异步生成器

异步生成器结合了生成器和异步特性,用于高效处理异步数据流。

异步生成器基础

Python
import asyncio

# 异步生成器函数
async def async_generator(count: int):
    "异步生成器"
    for i in range(count):
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i

# 使用 async for 消费
async def consume_generator():
    async for value in async_generator(5):
        print(f"收到: {value}")

asyncio.run(consume_generator())
Python
# 对比同步生成器
def sync_generator(count: int):
    for i in range(count):
        yield i

# 同步用 for,异步用 async for
for v in sync_generator(5):
    print(v)

async for v in async_generator(5):
    print(v)

异步数据流处理

Python
import asyncio

async def async_file_lines(filepath: str):
    "异步读取文件行"
    import aiofiles

    async with aiofiles.open(filepath, 'r') as f:
        async for line in f:
            await asyncio.sleep(0.01)  # 模拟处理延迟
            yield line.strip()

async def process_file_async(filepath: str):
    "异步处理文件"
    async for line in async_file_lines(filepath):
        print(f"处理: {line[:50]}...")
Python
# 异步消息队列生成器
import asyncio
from collections import deque

class AsyncQueueGenerator:
    "基于队列的异步生成器"

    def __init__(self):
        self.queue = deque()
        self.closed = False

    async def put(self, item):
        "添加数据"
        self.queue.append(item)

    async def close(self):
        "关闭生成器"
        self.closed = True

    async def items(self):
        "异步生成器"
        while not self.closed or self.queue:
            if self.queue:
                item = self.queue.popleft()
                yield item
            else:
                await asyncio.sleep(0.1)

async def queue_demo():
    qg = AsyncQueueGenerator()

    # 生产者任务
    async def producer():
        for i in range(5):
            await qg.put(f"item_{i}")
            await asyncio.sleep(0.2)
        await qg.close()

    # 消费者任务
    async def consumer():
        async for item in qg.items():
            print(f"消费: {item}")

    await asyncio.gather(producer(), consumer())

asyncio.run(queue_demo())

分页数据流

Python
import asyncio

async def async_paginate(url: str, page_size: int = 10):
    "异步分页生成器"
    page = 1

    while True:
        # 模拟API请求
        await asyncio.sleep(0.2)
        data = await fetch_page(url, page, page_size)

        if not data:
            break

        yield data
        page += 1

async def fetch_page(url: str, page: int, size: int):
    "模拟API"
    if page > 3:
        return None
    return [{"id": i + (page-1)*size} for i in range(size)]

async def process_pagination():
    "处理分页数据"
    total_items = 0

    async for page_data in async_paginate("https://api.example.com"):
        for item in page_data:
            print(f"处理ID: {item['id']}")
            total_items += 1

    print(f"总计: {total_items}")

asyncio.run(process_pagination())

异步生成器组合

Python
import asyncio

async def async_map(generator, transform):
    "异步映射"
    async for item in generator:
        yield await transform(item)

async def async_filter(generator, predicate):
    "异步过滤"
    async for item in generator:
        if await predicate(item):
            yield item

async def async_take(generator, count: int):
    "取前N个"
    taken = 0
    async for item in generator:
        if taken >= count:
            break
        yield item
        taken += 1

# 使用组合
async def combined_processing():
    async def source():
        for i in range(10):
            await asyncio.sleep(0.05)
            yield i

    async def transform(x):
        await asyncio.sleep(0.02)
        return x * 2

    async def predicate(x):
        await asyncio.sleep(0.01)
        return x > 5

    # 组合:取前5个 → 过滤>5 → 映射*2
    async for value in async_take(
        async_filter(
            async_map(source(), transform),
            predicate
        ),
        3
    ):
        print(f"结果: {value}")

asyncio.run(combined_processing())

异步生成器管道

Python
import asyncio
from typing import AsyncGenerator, Callable

class AsyncPipeline:
    "异步生成器管道"

    def __init__(self, source: AsyncGenerator):
        self.current = source

    def map(self, transform: Callable) -> 'AsyncPipeline':
        "添加映射阶段"
        async def mapped():
            async for item in self.current:
                yield await transform(item)
        self.current = mapped()
        return self

    def filter(self, predicate: Callable) -> 'AsyncPipeline':
        "添加过滤阶段"
        async def filtered():
            async for item in self.current:
                if await predicate(item):
                    yield item
        self.current = filtered()
        return self

    def batch(self, size: int) -> 'AsyncPipeline':
        "添加批处理阶段"
        async def batched():
            batch = []
            async for item in self.current:
                batch.append(item)
                if len(batch) >= size:
                    yield batch
                    batch = []
            if batch:
                yield batch
        self.current = batched()
        return self

    async def collect(self) -> list:
        "收集结果"
        result = []
        async for item in self.current:
            result.append(item)
        return result

async def pipeline_demo():
    async def source():
        for i in range(20):
            await asyncio.sleep(0.02)
            yield i

    result = await AsyncPipeline(source())
        .map(lambda x: x * 2)
        .filter(lambda x: x > 10)
        .batch(5)
        .collect()

    print(f"结果: {result}")

asyncio.run(pipeline_demo())

异步生成器异常处理

Python
import asyncio

async def safe_generator(gen):
    "安全的异步生成器包装"
    try:
        async for item in gen:
            try:
                yield item
            except Exception as e:
                print(f"处理错误: {e}")
                continue
    except Exception as e:
        print(f"生成器错误: {e}")

async def failing_generator():
    "模拟会失败的生成器"
    for i in range(5):
        await asyncio.sleep(0.1)
        if i == 3:
            raise ValueError("模拟错误")
        yield i

async def safe_processing():
    async for value in safe_generator(failing_generator()):
        print(f"值: {value}")

asyncio.run(safe_processing())

异步生成器资源管理

Python
import asyncio

async def managed_generator():
    "带资源管理的异步生成器"
    print("初始化资源")
    resource = await acquire_resource()

    try:
        for i in range(5):
            await asyncio.sleep(0.1)
            yield await process_with_resource(resource, i)
    finally:
        print("释放资源")
        await release_resource(resource)

async def acquire_resource():
    await asyncio.sleep(0.1)
    return "async_resource"

async def process_with_resource(resource, item):
    await asyncio.sleep(0.05)
    return f"{resource}_processed_{item}"

async def release_resource(resource):
    await asyncio.sleep(0.1)
    print(f"释放: {resource}")

async def use_managed():
    async for result in managed_generator():
        print(result)

asyncio.run(use_managed())

异步生成器与asyncio.gather

Python
import asyncio

async def concurrent_processing():
    "并发处理多个生成器"

    async def gen_a():
        for i in range(3):
            await asyncio.sleep(0.1)
            yield f"a_{i}"

    async def gen_b():
        for i in range(3):
            await asyncio.sleep(0.15)
            yield f"b_{i}"

    # 并发收集
    results_a = []
    results_b = []

    async def collect(gen, results):
        async for item in gen:
            results.append(item)

    await asyncio.gather(
        collect(gen_a(), results_a),
        collect(gen_b(), results_b)
    )

    print(f"a结果: {results_a}")
    print(f"b结果: {results_b}")

asyncio.run(concurrent_processing())

异步生成器迭代器适配

Python
import asyncio

class AsyncGenIterator:
    "异步生成器迭代器"

    def __init__(self, gen_func):
        self.gen_func = gen_func
        self.gen = None

    def __aiter__(self):
        self.gen = self.gen_func()
        return self

    async def __anext__(self):
        try:
            return await self.gen.__anext__()
        except StopAsyncIteration:
            raise

async def iterator_demo():
    async def my_gen():
        for i in range(3):
            await asyncio.sleep(0.1)
            yield i

    async for value in AsyncGenIterator(my_gen):
        print(value)

asyncio.run(iterator_demo())

实际应用示例

Python
import asyncio

class AsyncEventStream:
    "异步事件流"

    def __init__(self):
        self.events = asyncio.Queue()
        self.subscribers = []
        self.running = True

    async def publish(self, event):
        "发布事件"
        await self.events.put(event)

    async def subscribe(self):
        "订阅事件流"
        async def stream():
            while self.running:
                try:
                    event = await asyncio.wait_for(
                        self.events.get(),
                        timeout=0.5
                    )
                    yield event
                except asyncio.TimeoutError:
                    continue
        return stream()

    async def close(self):
        "关闭流"
        self.running = False

async def event_system_demo():
    stream = AsyncEventStream()

    async def producer():
        for i in range(5):
            await stream.publish({"type": "data", "value": i})
            await asyncio.sleep(0.2)
        await stream.close()

    async def consumer():
        async for event in await stream.subscribe():
            print(f"收到事件: {event}")

    await asyncio.gather(producer(), consumer())

asyncio.run(event_system_demo())

异步生成器性能优化

Python
import asyncio

async def chunked_generator(gen, chunk_size: int = 10):
    "分块生成器,减少await次数"

    async def collect_chunk():
        chunk = []
        try:
            for _ in range(chunk_size):
                item = await gen.__anext__()
                chunk.append(item)
        except StopAsyncIteration:
            pass
        return chunk

    while True:
        chunk = await collect_chunk()
        if not chunk:
            break
        for item in chunk:
            yield item

async def optimized_demo():
    async def slow_gen():
        for i in range(100):
            await asyncio.sleep(0.01)
            yield i

    # 原始:100次await
    # 优化:100/10=10次批量await
    count = 0
    async for item in chunked_generator(slow_gen(), 10):
        count += 1
    print(f"处理: {count}项")

要点总结

  1. 异步生成器使用async defyield
  2. **async for**消费异步生成器
  3. 管道模式组合多个处理阶段
  4. 资源管理在生成器中使用try/finally
  5. 并发处理使用asyncio.gather处理多个生成器

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python异步上下文管理器
下一篇 → Python asyncio协程原理
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库