全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python I/O性能优化

I/O操作是程序性能瓶颈的主要来源,优化I/O能显著提升整体性能。

文件I/O优化

使用缓冲读取

Python
# 低效:逐字符读取
with open('large.txt', 'r') as f:
    while char := f.read(1):
        process(char)

# 高效:按缓冲区大小读取
BUFFER_SIZE = 8192  # 8KB
with open('large.txt', 'r') as f:
    while chunk := f.read(BUFFER_SIZE):
        process_chunk(chunk)

# 高效:逐行读取(内置缓冲)
with open('large.txt', 'r') as f:
    for line in f:
        process_line(line)

二进制模式读取

Python
# 低效:文本模式(需要解码)
with open('large.bin', 'r') as f:
    content = f.read()

# 高效:二进制模式(直接读取)
with open('large.bin', 'rb') as f:
    content = f.read()

# 二进制模式更快,无编码转换开销

批量写入

Python
# 低效:逐行写入
with open('output.txt', 'w') as f:
    for line in lines:
        f.write(line + '\n')  # 每次写入触发IO

# 高效:批量写入
with open('output.txt', 'w') as f:
    f.write('\n'.join(lines))  # 一次写入

# 高效:缓冲写入
with open('output.txt', 'w', buffering=8192) as f:
    for line in lines:
        f.write(line + '\n')

使用mmap内存映射

Python
import mmap

# 内存映射大文件(无需全部加载到内存)
with open('large.bin', 'rb') as f:
    mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

    # 随机访问
    data = mm[1000:2000]  # 只读取需要的部分

    mm.close()

# 写入模式
with open('output.bin', 'r+b') as f:
    mm = mmap.mmap(f.fileno(), 0)
    mm[0:10] = b'new data'
    mm.close()

文件描述符操作

Python
import os

# 低效:多次open/close
for chunk in chunks:
    with open('file.txt', 'a') as f:
        f.write(chunk)

# 高效:保持文件打开
with open('file.txt', 'a') as f:
    for chunk in chunks:
        f.write(chunk)

网络I/O优化

连接池复用

Python
import requests
from requests.adapters import HTTPAdapter

# 低效:每次创建新连接
for url in urls:
    response = requests.get(url)  # 每次新建连接

# 高效:使用Session复用连接
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=20)
session.mount('http://', adapter)
session.mount('https://', adapter)

for url in urls:
    response = session.get(url)  # 复用连接

session.close()

异步HTTP请求

Python
import aiohttp
import asyncio

async def fetch_batch(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [await r.text() for r in responses]

# 异步并发请求,吞吐量提升显著

超时设置

Python
import requests

# 设置超时避免阻塞
response = requests.get(url, timeout=(3.05, 30))  # (连接, 读取)

# aiohttp超时
import aiohttp
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
    pass

数据压缩传输

Python
import requests
import gzip

# 发送压缩数据
data = b'large data...'
compressed = gzip.compress(data)
headers = {'Content-Encoding': 'gzip'}
response = requests.post(url, data=compressed, headers=headers)

# 自动解压响应
response = requests.get(url)
# requests自动处理gzip响应

数据库I/O优化

连接池

Python
# SQLAlchemy连接池
from sqlalchemy import create_engine

engine = create_engine(
    'postgresql://user:pass@localhost/db',
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=3600
)

# 使用连接池
with engine.connect() as conn:
    result = conn.execute('SELECT * FROM users')

批量操作

Python
# 低效:逐条插入
for record in records:
    conn.execute('INSERT INTO users VALUES (?, ?)', record)

# 高效:批量插入
conn.execute('INSERT INTO users VALUES (?, ?)', records)

# 使用executemany
cursor.executemany('INSERT INTO users VALUES (?, ?)', records)

预编译语句

Python
# 低效:每次编译SQL
cursor.execute('SELECT * FROM users WHERE id = ' + str(user_id))

# 高效:参数化查询(预编译)
cursor.execute('SELECT * FROM users WHERE id = ?', (user_id,))
# SQL预编译,多次执行更高效

事务批处理

Python
# 低效:多次事务
for record in records:
    conn.execute('INSERT INTO table VALUES (?)', record)
    conn.commit()  # 每次提交

# 高效:单次事务
conn.execute('BEGIN')
for record in records:
    conn.execute('INSERT INTO table VALUES (?)', record)
conn.commit()  # 批量提交

异步I/O综合

异步文件操作

Python
import aiofiles

async def async_read():
    async with aiofiles.open('large.txt', 'r') as f:
        content = await f.read()
    return content

async def async_write():
    async with aiofiles.open('output.txt', 'w') as f:
        await f.write('content')

async def async_stream():
    async with aiofiles.open('large.txt', 'r') as f:
        async for line in f:
            process_line(line)

异步数据库

Python
import asyncpg

async def async_db():
    pool = await asyncpg.create_pool(
        'postgresql://user:pass@localhost/db',
        min_size=5,
        max_size=20
    )

    async with pool.acquire() as conn:
        rows = await conn.fetch('SELECT * FROM users')

    await pool.close()

异步Redis

Python
import redis.asyncio as redis

async def async_redis():
    client = redis.Redis(host='localhost', port=6379)

    # 管道批量操作
    async with client.pipeline() as pipe:
        pipe.set('key1', 'value1')
        pipe.set('key2', 'value2')
        pipe.get('key3')
        results = await pipe.execute()

    await client.close()

I/O多路复用

select/poll/epoll

Python
import selectors

def server_with_selector():
    sel = selectors.DefaultSelector()

    def accept(sock):
        conn, addr = sock.accept()
        conn.setblocking(False)
        sel.register(conn, selectors.EVENT_READ, read)

    def read(conn):
        data = conn.recv(1024)
        if data:
            conn.send(data)
        else:
            sel.unregister(conn)
            conn.close()

    sock = socket.socket()
    sock.bind(('localhost', 8080))
    sock.listen()
    sock.setblocking(False)
    sel.register(sock, selectors.EVENT_READ, accept)

    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data
            callback(key.fileobj)

缓存策略

内存缓存

Python
from functools import lru_cache

@lru_cache(maxsize=1000)
def read_file_cached(path):
    with open(path, 'r') as f:
        return f.read()

# 缓存文件内容,避免重复IO

Redis缓存

Python
import redis

r = redis.Redis()

def get_with_cache(key, fetch_func, ttl=3600):
    cached = r.get(key)
    if cached:
        return cached

    data = fetch_func()
    r.setex(key, ttl, data)
    return data

本地文件缓存

Python
import json
import hashlib
import os

CACHE_DIR = './cache'

def cache_result(key, data):
    cache_file = os.path.join(CACHE_DIR, hashlib.md5(key.encode()).hexdigest())
    with open(cache_file, 'w') as f:
        json.dump(data, f)

def load_cached(key):
    cache_file = os.path.join(CACHE_DIR, hashlib.md5(key.encode()).hexdigest())
    if os.path.exists(cache_file):
        with open(cache_file, 'r') as f:
            return json.load(f)
    return None

性能测试对比

操作低效方式高效方式提升倍数
文件读取逐字符8KB缓冲~100x
字符串写入逐行writejoin批量~10x
HTTP请求每次新建连接Session复用~5x
数据库插入逐条提交批量事务~20x

注意:I/O优化要根据实际瓶颈针对性优化,使用cProfile定位热点。

要点总结

  • 文件读取使用缓冲区(8KB+),大文件用mmap内存映射
  • 网络请求用Session复用连接,异步并发提升吞吐
  • 数据库用连接池、批量操作、事务合并减少IO次数
  • 异步IO(asyncio+aiohttp+asyncpg)适合高并发场景
  • 使用缓存避免重复IO,lru_cache内存缓存或Redis分布式缓存
  • I/O多路复用适合单线程处理大量连接

存放路径articles/PYTHON/专家/性能优化/I/O性能优化.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python GIL深度解析
下一篇 → Python内存泄漏排查
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库