全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python异步IO实现

asyncio提供了完整的异步IO支持,包括网络编程和文件操作。

异步网络编程

TCP客户端

Python
import asyncio

async def tcp_client():
    reader, writer = await asyncio.open_connection(
        'example.com', 80
    )

    # 发送数据
    writer.write(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
    await writer.drain()

    # 接收数据
    data = await reader.read(1024)
    print(data.decode())

    # 关闭连接
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_client())

TCP服务器

Python
async def handle_client(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Connected: {addr}")

    while True:
        data = await reader.read(1024)
        if not data:
            break

        # 回显
        writer.write(data)
        await writer.drain()

    print(f"Disconnected: {addr}")
    writer.close()
    await writer.wait_closed()

async def tcp_server():
    server = await asyncio.start_server(
        handle_client, '0.0.0.0', 8888
    )

    async with server:
        await server.serve_forever()

asyncio.run(tcp_server())

UDP编程

Python
async def udp_echo_server():
    # 创建UDP端点
    transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
        lambda: UdpProtocol(),
        local_addr=('0.0.0.0', 9999)
    )

    try:
        await asyncio.Future()  # 永远运行
    finally:
        transport.close()

class UdpProtocol(asyncio.DatagramProtocol):
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        print(f"Received from {addr}: {data}")
        self.transport.sendto(data, addr)  # 回显

async def udp_client():
    transport, protocol = await asyncio.get_event_loop().create_datagram_endpoint(
        lambda: UdpProtocol(),
        remote_addr=('127.0.0.1', 9999)
    )

    transport.sendto(b"Hello UDP")
    await asyncio.sleep(1)
    transport.close()

HTTP异步客户端

aiohttp基础

Python
import aiohttp
import asyncio

async def fetch_url():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://api.example.com/data') as resp:
            return await resp.json()

async def post_data():
    async with aiohttp.ClientSession() as session:
        async with session.post(
            'https://api.example.com/submit',
            json={'key': 'value'}
        ) as resp:
            return await resp.text()

asyncio.run(fetch_url())

并发请求

Python
async def fetch_multiple(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

async def fetch_one(session, url):
    async with session.get(url) as resp:
        return await resp.text()

# 超时控制
async def fetch_with_timeout():
    timeout = aiohttp.ClientTimeout(total=10)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get('https://example.com') as resp:
            return await resp.text()

连接池管理

Python
async def connection_pool():
    connector = aiohttp.TCPConnector(
        limit=100,        # 总连接数限制
        limit_per_host=10,  # 每个host的连接数限制
        ttl_dns_cache=300   # DNS缓存时间
    )

    async with aiohttp.ClientSession(connector=connector) as session:
        pass

异步文件操作

aiofiles库

Python
import aiofiles

async def read_file():
    async with aiofiles.open('data.txt', 'r') as f:
        content = await f.read()
    return content

async def write_file():
    async with aiofiles.open('output.txt', 'w') as f:
        await f.write('Hello async file!')

async def read_lines():
    async with aiofiles.open('large.txt', 'r') as f:
        async for line in f:
            print(line.strip())

async def append_file():
    async with aiofiles.open('log.txt', 'a') as f:
        await f.write('New log entry\n')

异步文件操作封装

Python
import aiofiles.os as aios
import os

async def file_operations():
    # 创建目录
    await aios.makedirs('output/subdir', exist_ok=True)

    # 检查文件是否存在
    exists = await aios.path.exists('data.txt')

    # 获取文件信息
    stat = await aios.stat('data.txt')
    size = stat.st_size

    # 列出目录内容
    entries = await aios.listdir('output')

    # 重命名
    await aios.rename('old.txt', 'new.txt')

    # 删除
    await aios.remove('temp.txt')

    await aios.rmdir('empty_dir')

异步流处理

StreamReader/StreamWriter

Python
async def stream_copy():
    reader, writer = await asyncio.open_connection('source.com', 80)

    with open('output.bin', 'wb') as f:
        while True:
            chunk = await reader.read(4096)
            if not chunk:
                break
            f.write(chunk)

    writer.close()
    await writer.wait_closed()

async def stream_write():
    reader, writer = await asyncio.open_connection('dest.com', 80)

    for data in generate_data():
        writer.write(data)
        await writer.drain()  # 确保缓冲区已刷新

    writer.close()
    await writer.wait_closed()

管道操作

Python
async def pipe_example():
    # 创建管道
    r_fd, w_fd = os.pipe()

    reader = asyncio.StreamReader()
    reader_protocol = asyncio.StreamReaderProtocol(reader)

    loop = asyncio.get_event_loop()
    transport, _ = await loop.connect_read_pipe(
        lambda: reader_protocol, os.fdopen(r_fd, 'rb')
    )

    writer = asyncio.StreamWriter(
        transport, reader_protocol, reader, loop
    )

    # 写入管道
    writer.write(b"Hello pipe")
    await writer.drain()

异步数据库操作

asyncpg (PostgreSQL)

Python
import asyncpg

async def db_operations():
    # 创建连接池
    pool = await asyncpg.create_pool(
        host='localhost',
        user='user',
        password='password',
        database='mydb',
        min_size=5,
        max_size=20
    )

    async with pool.acquire() as conn:
        # 查询
        rows = await conn.fetch('SELECT * FROM users WHERE id = $1', 1)

        # 单行查询
        row = await conn.fetchrow('SELECT * FROM users LIMIT 1')

        # 插入
        await conn.execute(
            'INSERT INTO users(name, email) VALUES($1, $2)',
            'Alice', 'alice@example.com'
        )

    await pool.close()

aiomysql

Python
import aiomysql

async def mysql_operations():
    pool = await aiomysql.create_pool(
        host='localhost',
        user='root',
        password='password',
        db='testdb',
        minsize=5,
        maxsize=20
    )

    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT * FROM users")
            rows = await cur.fetchall()

    pool.close()
    await pool.wait_closed()

异步Redis操作

aioredis

Python
import redis.asyncio as redis

async def redis_operations():
    client = redis.Redis(host='localhost', port=6379, db=0)

    # 字符串操作
    await client.set('key', 'value', ex=3600)  # 1小时过期
    value = await client.get('key')

    # 哈希操作
    await client.hset('user:1', mapping={'name': 'Alice', 'age': 30})
    user = await client.hgetall('user:1')

    # 列表操作
    await client.lpush('queue', 'task1', 'task2')
    task = await client.rpop('queue')

    # 发布订阅
    pubsub = client.pubsub()
    await pubsub.subscribe('channel')
    async for message in pubsub.listen():
        if message['type'] == 'message':
            print(message['data'])

    await client.close()

异步上下文管理器

自定义异步上下文

Python
class AsyncResource:
    async def __aenter__(self):
        print("Acquiring resource...")
        await asyncio.sleep(0.1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Releasing resource...")
        await asyncio.sleep(0.1)
        return False

async def use_resource():
    async with AsyncResource() as r:
        print("Using resource")

异步迭代器

Python
class AsyncRange:
    def __init__(self, count):
        self.count = count

    def __aiter__(self):
        self.i = 0
        return self

    async def __anext__(self):
        if self.i >= self.count:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.i += 1
        return self.i

async def iterate_async():
    async for i in AsyncRange(5):
        print(i)

性能优化技巧

批量操作

Python
async def batch_requests(urls, batch_size=10):
    results = []
    for i in range(0, len(urls), batch_size):
        batch = urls[i:i+batch_size]
        batch_results = await asyncio.gather(*[
            fetch_one(url) for url in batch
        ])
        results.extend(batch_results)
    return results

信号量限制并发

Python
async def limited_concurrency(urls, max_concurrent=5):
    semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch_with_limit(url):
        async with semaphore:
            return await fetch_one(url)

    return await asyncio.gather(*[
        fetch_with_limit(url) for url in urls
    ])

注意:异步IO适合IO密集型场景,CPU密集型操作应使用线程池或进程池。

要点总结

  • TCP使用open_connection/start_server,UDP使用create_datagram_endpoint
  • HTTP客户端推荐aiohttp,支持连接池、超时、会话管理
  • 文件操作使用aiofiles,支持读写、目录操作、流式处理
  • 数据库使用异步驱动:asyncpg、aiomysql、motor(MongoDB)
  • 使用信号量控制并发数,批量操作提高吞吐量

存放路径articles/PYTHON/专家/并发与异步高级/异步IO实现.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python分布式任务队列
下一篇 → Python异步任务调度
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库