Python异步生成器
异步生成器结合了生成器和异步特性,用于高效处理异步数据流。
异步生成器基础
Python
import asyncio
# 异步生成器函数
async def async_generator(count: int):
"异步生成器"
for i in range(count):
await asyncio.sleep(0.1) # 模拟异步操作
yield i
# 使用 async for 消费
async def consume_generator():
async for value in async_generator(5):
print(f"收到: {value}")
asyncio.run(consume_generator())
Python
# 对比同步生成器
def sync_generator(count: int):
for i in range(count):
yield i
# 同步用 for,异步用 async for
for v in sync_generator(5):
print(v)
async for v in async_generator(5):
print(v)
异步数据流处理
Python
import asyncio
async def async_file_lines(filepath: str):
"异步读取文件行"
import aiofiles
async with aiofiles.open(filepath, 'r') as f:
async for line in f:
await asyncio.sleep(0.01) # 模拟处理延迟
yield line.strip()
async def process_file_async(filepath: str):
"异步处理文件"
async for line in async_file_lines(filepath):
print(f"处理: {line[:50]}...")
Python
# 异步消息队列生成器
import asyncio
from collections import deque
class AsyncQueueGenerator:
"基于队列的异步生成器"
def __init__(self):
self.queue = deque()
self.closed = False
async def put(self, item):
"添加数据"
self.queue.append(item)
async def close(self):
"关闭生成器"
self.closed = True
async def items(self):
"异步生成器"
while not self.closed or self.queue:
if self.queue:
item = self.queue.popleft()
yield item
else:
await asyncio.sleep(0.1)
async def queue_demo():
qg = AsyncQueueGenerator()
# 生产者任务
async def producer():
for i in range(5):
await qg.put(f"item_{i}")
await asyncio.sleep(0.2)
await qg.close()
# 消费者任务
async def consumer():
async for item in qg.items():
print(f"消费: {item}")
await asyncio.gather(producer(), consumer())
asyncio.run(queue_demo())
分页数据流
Python
import asyncio
async def async_paginate(url: str, page_size: int = 10):
"异步分页生成器"
page = 1
while True:
# 模拟API请求
await asyncio.sleep(0.2)
data = await fetch_page(url, page, page_size)
if not data:
break
yield data
page += 1
async def fetch_page(url: str, page: int, size: int):
"模拟API"
if page > 3:
return None
return [{"id": i + (page-1)*size} for i in range(size)]
async def process_pagination():
"处理分页数据"
total_items = 0
async for page_data in async_paginate("https://api.example.com"):
for item in page_data:
print(f"处理ID: {item['id']}")
total_items += 1
print(f"总计: {total_items}")
asyncio.run(process_pagination())
异步生成器组合
Python
import asyncio
async def async_map(generator, transform):
"异步映射"
async for item in generator:
yield await transform(item)
async def async_filter(generator, predicate):
"异步过滤"
async for item in generator:
if await predicate(item):
yield item
async def async_take(generator, count: int):
"取前N个"
taken = 0
async for item in generator:
if taken >= count:
break
yield item
taken += 1
# 使用组合
async def combined_processing():
async def source():
for i in range(10):
await asyncio.sleep(0.05)
yield i
async def transform(x):
await asyncio.sleep(0.02)
return x * 2
async def predicate(x):
await asyncio.sleep(0.01)
return x > 5
# 组合:取前5个 → 过滤>5 → 映射*2
async for value in async_take(
async_filter(
async_map(source(), transform),
predicate
),
3
):
print(f"结果: {value}")
asyncio.run(combined_processing())
异步生成器管道
Python
import asyncio
from typing import AsyncGenerator, Callable
class AsyncPipeline:
"异步生成器管道"
def __init__(self, source: AsyncGenerator):
self.current = source
def map(self, transform: Callable) -> 'AsyncPipeline':
"添加映射阶段"
async def mapped():
async for item in self.current:
yield await transform(item)
self.current = mapped()
return self
def filter(self, predicate: Callable) -> 'AsyncPipeline':
"添加过滤阶段"
async def filtered():
async for item in self.current:
if await predicate(item):
yield item
self.current = filtered()
return self
def batch(self, size: int) -> 'AsyncPipeline':
"添加批处理阶段"
async def batched():
batch = []
async for item in self.current:
batch.append(item)
if len(batch) >= size:
yield batch
batch = []
if batch:
yield batch
self.current = batched()
return self
async def collect(self) -> list:
"收集结果"
result = []
async for item in self.current:
result.append(item)
return result
async def pipeline_demo():
async def source():
for i in range(20):
await asyncio.sleep(0.02)
yield i
result = await AsyncPipeline(source())
.map(lambda x: x * 2)
.filter(lambda x: x > 10)
.batch(5)
.collect()
print(f"结果: {result}")
asyncio.run(pipeline_demo())
异步生成器异常处理
Python
import asyncio
async def safe_generator(gen):
"安全的异步生成器包装"
try:
async for item in gen:
try:
yield item
except Exception as e:
print(f"处理错误: {e}")
continue
except Exception as e:
print(f"生成器错误: {e}")
async def failing_generator():
"模拟会失败的生成器"
for i in range(5):
await asyncio.sleep(0.1)
if i == 3:
raise ValueError("模拟错误")
yield i
async def safe_processing():
async for value in safe_generator(failing_generator()):
print(f"值: {value}")
asyncio.run(safe_processing())
异步生成器资源管理
Python
import asyncio
async def managed_generator():
"带资源管理的异步生成器"
print("初始化资源")
resource = await acquire_resource()
try:
for i in range(5):
await asyncio.sleep(0.1)
yield await process_with_resource(resource, i)
finally:
print("释放资源")
await release_resource(resource)
async def acquire_resource():
await asyncio.sleep(0.1)
return "async_resource"
async def process_with_resource(resource, item):
await asyncio.sleep(0.05)
return f"{resource}_processed_{item}"
async def release_resource(resource):
await asyncio.sleep(0.1)
print(f"释放: {resource}")
async def use_managed():
async for result in managed_generator():
print(result)
asyncio.run(use_managed())
异步生成器与asyncio.gather
Python
import asyncio
async def concurrent_processing():
"并发处理多个生成器"
async def gen_a():
for i in range(3):
await asyncio.sleep(0.1)
yield f"a_{i}"
async def gen_b():
for i in range(3):
await asyncio.sleep(0.15)
yield f"b_{i}"
# 并发收集
results_a = []
results_b = []
async def collect(gen, results):
async for item in gen:
results.append(item)
await asyncio.gather(
collect(gen_a(), results_a),
collect(gen_b(), results_b)
)
print(f"a结果: {results_a}")
print(f"b结果: {results_b}")
asyncio.run(concurrent_processing())
异步生成器迭代器适配
Python
import asyncio
class AsyncGenIterator:
"异步生成器迭代器"
def __init__(self, gen_func):
self.gen_func = gen_func
self.gen = None
def __aiter__(self):
self.gen = self.gen_func()
return self
async def __anext__(self):
try:
return await self.gen.__anext__()
except StopAsyncIteration:
raise
async def iterator_demo():
async def my_gen():
for i in range(3):
await asyncio.sleep(0.1)
yield i
async for value in AsyncGenIterator(my_gen):
print(value)
asyncio.run(iterator_demo())
实际应用示例
Python
import asyncio
class AsyncEventStream:
"异步事件流"
def __init__(self):
self.events = asyncio.Queue()
self.subscribers = []
self.running = True
async def publish(self, event):
"发布事件"
await self.events.put(event)
async def subscribe(self):
"订阅事件流"
async def stream():
while self.running:
try:
event = await asyncio.wait_for(
self.events.get(),
timeout=0.5
)
yield event
except asyncio.TimeoutError:
continue
return stream()
async def close(self):
"关闭流"
self.running = False
async def event_system_demo():
stream = AsyncEventStream()
async def producer():
for i in range(5):
await stream.publish({"type": "data", "value": i})
await asyncio.sleep(0.2)
await stream.close()
async def consumer():
async for event in await stream.subscribe():
print(f"收到事件: {event}")
await asyncio.gather(producer(), consumer())
asyncio.run(event_system_demo())
异步生成器性能优化
Python
import asyncio
async def chunked_generator(gen, chunk_size: int = 10):
"分块生成器,减少await次数"
async def collect_chunk():
chunk = []
try:
for _ in range(chunk_size):
item = await gen.__anext__()
chunk.append(item)
except StopAsyncIteration:
pass
return chunk
while True:
chunk = await collect_chunk()
if not chunk:
break
for item in chunk:
yield item
async def optimized_demo():
async def slow_gen():
for i in range(100):
await asyncio.sleep(0.01)
yield i
# 原始:100次await
# 优化:100/10=10次批量await
count = 0
async for item in chunked_generator(slow_gen(), 10):
count += 1
print(f"处理: {count}项")
要点总结
- 异步生成器使用
async def和yield - **
async for**消费异步生成器 - 管道模式组合多个处理阶段
- 资源管理在生成器中使用
try/finally - 并发处理使用
asyncio.gather处理多个生成器
📝 发现内容有误?点击此处直接编辑