Python异步IO实现
asyncio提供了完整的异步IO支持,包括网络编程和文件操作。
异步网络编程
TCP客户端
Python
import asyncio
async def tcp_client():
reader, writer = await asyncio.open_connection(
'example.com', 80
)
# 发送数据
writer.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
await writer.drain()
# 接收数据
data = await reader.read(1024)
print(data.decode())
# 关闭连接
writer.close()
await writer.wait_closed()
asyncio.run(tcp_client())
TCP服务器
Python
async def handle_client(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Connected: {addr}")
while True:
data = await reader.read(1024)
if not data:
break
# 回显
writer.write(data)
await writer.drain()
print(f"Disconnected: {addr}")
writer.close()
await writer.wait_closed()
async def tcp_server():
server = await asyncio.start_server(
handle_client, '0.0.0.0', 8888
)
async with server:
await server.serve_forever()
asyncio.run(tcp_server())
UDP编程
Python
async def udp_echo_server():
# 创建UDP端点
transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
lambda: UdpProtocol(),
local_addr=('0.0.0.0', 9999)
)
try:
await asyncio.Future() # 永远运行
finally:
transport.close()
class UdpProtocol(asyncio.DatagramProtocol):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
print(f"Received from {addr}: {data}")
self.transport.sendto(data, addr) # 回显
async def udp_client():
transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
lambda: UdpProtocol(),
remote_addr=('127.0.0.1', 9999)
)
transport.sendto(b"Hello UDP")
await asyncio.sleep(1)
transport.close()
HTTP异步客户端
aiohttp基础
Python
import aiohttp
import asyncio
async def fetch_url():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
return await resp.json()
async def post_data():
async with aiohttp.ClientSession() as session:
async with session.post(
'https://api.example.com/submit',
json={'key': 'value'}
) as resp:
return await resp.text()
asyncio.run(fetch_url())
并发请求
Python
async def fetch_multiple(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def fetch_one(session, url):
async with session.get(url) as resp:
return await resp.text()
# 超时控制
async def fetch_with_timeout():
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://example.com') as resp:
return await resp.text()
连接池管理
Python
async def connection_pool():
connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=10, # 每个host的连接数限制
ttl_dns_cache=300 # DNS缓存时间
)
async with aiohttp.ClientSession(connector=connector) as session:
pass
异步文件操作
aiofiles库
Python
import aiofiles
async def read_file():
async with aiofiles.open('data.txt', 'r') as f:
content = await f.read()
return content
async def write_file():
async with aiofiles.open('output.txt', 'w') as f:
await f.write('Hello async file!')
async def read_lines():
async with aiofiles.open('large.txt', 'r') as f:
async for line in f:
print(line.strip())
async def append_file():
async with aiofiles.open('log.txt', 'a') as f:
await f.write('New log entry\n')
异步文件操作封装
Python
import aiofiles.os as aios
import os
async def file_operations():
# 创建目录
await aios.makedirs('output/subdir', exist_ok=True)
# 检查文件是否存在
exists = await aios.path.exists('data.txt')
# 获取文件信息
stat = await aios.stat('data.txt')
size = stat.st_size
# 列出目录内容
entries = await aios.listdir('output')
# 重命名
await aios.rename('old.txt', 'new.txt')
# 删除
await aios.remove('temp.txt')
await aios.rmdir('empty_dir')
异步流处理
StreamReader/StreamWriter
Python
async def stream_copy():
reader, writer = await asyncio.open_connection('source.com', 80)
with open('output.bin', 'wb') as f:
while True:
chunk = await reader.read(4096)
if not chunk:
break
f.write(chunk)
writer.close()
await writer.wait_closed()
async def stream_write():
reader, writer = await asyncio.open_connection('dest.com', 80)
for data in generate_data():
writer.write(data)
await writer.drain() # 确保缓冲区已刷新
writer.close()
await writer.wait_closed()
管道操作
Python
async def pipe_example():
# 创建管道
r_fd, w_fd = os.pipe()
reader = asyncio.StreamReader()
reader_protocol = asyncio.StreamReaderProtocol(reader)
loop = asyncio.get_event_loop()
transport, _ = await loop.connect_read_pipe(
lambda: reader_protocol, os.fdopen(r_fd, 'rb')
)
writer = asyncio.StreamWriter(
transport, reader_protocol, reader, loop
)
# 写入管道
writer.write(b"Hello pipe")
await writer.drain()
异步数据库操作
asyncpg (PostgreSQL)
Python
import asyncpg
async def db_operations():
# 创建连接池
pool = await asyncpg.create_pool(
host='localhost',
user='user',
password='password',
database='mydb',
min_size=5,
max_size=20
)
async with pool.acquire() as conn:
# 查询
rows = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)
# 单行查询
row = await conn.fetchrow('SELECT * FROM users LIMIT 1')
# 插入
await conn.execute(
'INSERT INTO users(name, email) VALUES($1, $2)',
'Alice', 'alice@example.com'
)
await pool.close()
aiomysql
Python
import aiomysql
async def mysql_operations():
pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='password',
db='testdb',
minsize=5,
maxsize=20
)
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM users")
rows = await cur.fetchall()
pool.close()
await pool.wait_closed()
异步Redis操作
aioredis
Python
import redis.asyncio as redis
async def redis_operations():
client = redis.Redis(host='localhost', port=6379, db=0)
# 字符串操作
await client.set('key', 'value', ex=3600) # 1小时过期
value = await client.get('key')
# 哈希操作
await client.hset('user:1', mapping={'name': 'Alice', 'age': 30})
user = await client.hgetall('user:1')
# 列表操作
await client.lpush('queue', 'task1', 'task2')
task = await client.rpop('queue')
# 发布订阅
pubsub = client.pubsub()
await pubsub.subscribe('channel')
async for message in pubsub.listen():
if message['type'] == 'message':
print(message['data'])
await client.close()
异步上下文管理器
自定义异步上下文
Python
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(0.1)
return False
async def use_resource():
async with AsyncResource() as r:
print("Using resource")
异步迭代器
Python
class AsyncRange:
def __init__(self, count):
self.count = count
def __aiter__(self):
self.i = 0
return self
async def __anext__(self):
if self.i >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.i += 1
return self.i
async def iterate_async():
async for i in AsyncRange(5):
print(i)
性能优化技巧
批量操作
Python
async def batch_requests(urls, batch_size=10):
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i+batch_size]
batch_results = await asyncio.gather(*[
fetch_one(url) for url in batch
])
results.extend(batch_results)
return results
信号量限制并发
Python
async def limited_concurrency(urls, max_concurrent=5):
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(url):
async with semaphore:
return await fetch_one(url)
return await asyncio.gather(*[
fetch_with_limit(url) for url in urls
])
注意:异步IO适合IO密集型场景,CPU密集型操作应使用线程池或进程池。
要点总结
- TCP使用
open_connection/start_server,UDP使用create_datagram_endpoint - HTTP客户端推荐aiohttp,支持连接池、超时、会话管理
- 文件操作使用aiofiles,支持读写、目录操作、流式处理
- 数据库使用异步驱动:asyncpg、aiomysql、motor(MongoDB)
- 使用信号量控制并发数,批量操作提高吞吐量
存放路径:articles/PYTHON/专家/并发与异步高级/异步IO实现.md
📝 发现内容有误?点击此处直接编辑