Python 进程池 ProcessPoolExecutor
ProcessPoolExecutor 自动管理进程池,适合 CPU 密集型任务的并行处理。
基本使用
Python
from concurrent.futures import ProcessPoolExecutor
import time
def cpu_task(n):
total = 0
for i in range(n):
total += i
return total
# 创建进程池
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(cpu_task, 10000000)
result = future.result()
print(result)
submit 提交任务
Python
from concurrent.futures import ProcessPoolExecutor
def process(n):
return n ** 2
with ProcessPoolExecutor(max_workers=4) as executor:
futures = []
for n in [1, 2, 3, 4, 5]:
future = executor.submit(process, n)
futures.append(future)
for future in futures:
print(future.result())
map 批量处理
Python
from concurrent.futures import ProcessPoolExecutor
def factorial(n):
if n < 2:
return 1
result = 1
for i in range(2, n + 1):
result *= i
return result
numbers = [5, 10, 15, 20, 25]
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(factorial, numbers)
print(list(results))
并行计算示例
Python
from concurrent.futures import ProcessPoolExecutor
import math
def is_prime(n):
if n < 2:
return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0:
return False
return True
# 并行检查多个数字是否为质数
numbers = range(100000, 101000)
with ProcessPoolExecutor(max_workers=4) as executor:
primes = [n for n, is_p in zip(numbers, executor.map(is_prime, numbers)) if is_p]
print(f"质数数量: {len(primes)}")
Future 回调
Python
from concurrent.futures import ProcessPoolExecutor
def compute(n):
return sum(range(n))
def done_callback(future):
print(f"计算完成,结果: {future.result()}")
with ProcessPoolExecutor(max_workers=2) as executor:
future = executor.submit(compute, 1000000)
future.add_done_callback(done_callback)
as_completed 按完成顺序获取
Python
from concurrent.futures import ProcessPoolExecutor, as_completed
def task(n):
import time
time.sleep(n % 3) # 不同耗时
return n ** 2
with ProcessPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(task, i): i for i in range(10)}
for future in as_completed(futures):
n = futures[future]
print(f"任务 {n} 完成: {future.result()}")
异常处理
Python
from concurrent.futures import ProcessPoolExecutor
def risky_task(n):
if n == 3:
raise ValueError("不允许数字3")
return n * 2
with ProcessPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(risky_task, i) for i in range(5)]
for future in futures:
try:
print(future.result())
except Exception as e:
print(f"异常: {e}")
性能对比
Python
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
import time
def cpu_bound():
return sum(range(10000000))
# 多线程(受 GIL 影响)
start = time.time()
with ThreadPoolExecutor(max_workers=4) as executor:
executor.map(cpu_bound, range(4))
print(f"线程池: {time.time() - start:.2f}s")
# 多进程(绕过 GIL)
start = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
executor.map(cpu_bound, range(4))
print(f"进程池: {time.time() - start:.2f}s")
chunksize 参数
Python
from concurrent.futures import ProcessPoolExecutor
def process(n):
return n * 2
# 大数据量时设置 chunksize 提升效率
data = range(100000)
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(process, data, chunksize=1000)
进程池与线程池对比
| 特性 | ProcessPoolExecutor | ThreadPoolExecutor |
|---|---|---|
| 绕过 GIL | 是 | 否 |
| 内存占用 | 高 | 低 |
| 启动开销 | 大 | 小 |
| 适用任务 | CPU 密集型 | I/O 密集型 |
| 数据共享 | 需 IPC | 直接共享 |
要点总结
- ProcessPoolExecutor 绕过 GIL,实现真正并行
max_workers通常设为 CPU 核心数submit()提交单个任务,返回 Futuremap()批量处理,返回结果迭代器chunksize大数据量时优化性能- 适合 CPU 密集型任务的并行计算
- 进程间通信开销较大,注意数据量
📝 发现内容有误?点击此处直接编辑