Python多进程并发模式
Python多进程绕过GIL限制,适合CPU密集型任务。掌握多进程架构设计方法。
multiprocessing 基础
Python
import multiprocessing
import os
def worker(task_id: int):
"工作进程"
print(f"进程 {os.getpid()} 处理任务 {task_id}")
return task_id * 2
if __name__ == '__main__':
# 创建进程池
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker, range(10))
print(f"结果: {results}")
Python
import multiprocessing
# Process 类
def task(name: str):
print(f"进程执行: {name}")
if __name__ == '__main__':
processes = [
multiprocessing.Process(target=task, args=(f"task_{i}",))
for i in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join() # 等待完成
进程池 Pool
Python
import multiprocessing
import time
def cpu_task(n: int) -> int:
"CPU密集型任务"
result = 0
for i in range(n):
result += i * i
return result
if __name__ == '__main__':
tasks = [10**6, 10**6, 10**6, 10**6]
# 方式1:map
with multiprocessing.Pool() as pool:
results = pool.map(cpu_task, tasks)
print(f"map结果: {results}")
# 方式2:apply(单个任务)
with multiprocessing.Pool() as pool:
result = pool.apply(cpu_task, (10**6,))
print(f"apply结果: {result}")
# 方式3:map_async(异步)
with multiprocessing.Pool() as pool:
async_result = pool.map_async(cpu_task, tasks)
# 可以做其他事情
print("等待结果...")
results = async_result.get()
print(f"async结果: {results}")
Python
import multiprocessing
def process_with_callback(result):
"回调函数"
print(f"回调收到: {result}")
if __name__ == '__main__':
with multiprocessing.Pool() as pool:
# 带回调的异步执行
pool.apply_async(
cpu_task,
(10**6,),
callback=process_with_callback
)
pool.close()
pool.join()
进程间通信
Python
import multiprocessing
# Queue 通信
def producer(queue: multiprocessing.Queue):
"生产者"
for i in range(10):
queue.put(i)
print(f"生产: {i}")
queue.put(None) # 结束信号
def consumer(queue: multiprocessing.Queue):
"消费者"
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item}")
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
Python
import multiprocessing
# Pipe 通信(双向)
def sender(conn):
"发送端"
conn.send("来自sender的消息")
msg = conn.recv()
print(f"sender收到: {msg}")
conn.close()
def receiver(conn):
"接收端"
msg = conn.recv()
print(f"receiver收到: {msg}")
conn.send("来自receiver的回复")
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
共享内存
Python
import multiprocessing
# Value 和 Array(共享内存)
def worker_with_shared(value: multiprocessing.Value, arr: multiprocessing.Array):
"使用共享内存"
with value.get_lock():
value.value += 1
for i in range(len(arr)):
arr[i] += 1
if __name__ == '__main__':
# 共享整数
shared_value = multiprocessing.Value('i', 0) # 'i' = 整数
# 共享数组
shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])
processes = [
multiprocessing.Process(
target=worker_with_shared,
args=(shared_value, shared_array)
)
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"共享值: {shared_value.value}")
print(f"共享数组: {list(shared_array)}")
Python
import multiprocessing
# Manager(更灵活的共享)
def worker_with_manager(shared_dict, shared_list):
"使用Manager共享"
shared_dict['count'] = shared_dict.get('count', 0) + 1
shared_list.append(multiprocessing.current_process().name)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
shared_dict = manager.dict()
shared_list = manager.list()
processes = [
multiprocessing.Process(
target=worker_with_manager,
args=(shared_dict, shared_list)
)
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"共享字典: {shared_dict}")
print(f"共享列表: {list(shared_list)}")
并行计算模式
Python
import multiprocessing
import math
def parallel_chunk_processing(data: list, chunk_size: int = 100):
"分块并行处理"
def process_chunk(chunk):
return [math.sqrt(x) * x for x in chunk]
# 分块
chunks = [
data[i:i + chunk_size]
for i in range(0, len(data), chunk_size)
]
with multiprocessing.Pool() as pool:
results = pool.map(process_chunk, chunks)
# 合并结果
return [item for chunk in results for item in chunk]
if __name__ == '__main__':
data = range(1000)
result = parallel_chunk_processing(list(data))
print(f"处理完成: {len(result)}项")
Python
import multiprocessing
class ParallelProcessor:
"并行处理器"
def __init__(self, num_workers: int = None):
self.num_workers = num_workers or multiprocessing.cpu_count()
def map(self, func, data: list) -> list:
"并行映射"
with multiprocessing.Pool(self.num_workers) as pool:
return pool.map(func, data)
def starmap(self, func, data: list) -> list:
"带多参数的并行映射"
with multiprocessing.Pool(self.num_workers) as pool:
return pool.starmap(func, data)
def imap(self, func, data: list):
"迭代并行映射(惰性)"
with multiprocessing.Pool(self.num_workers) as pool:
for result in pool.imap(func, data):
yield result
# 使用
def transform(x: int) -> int:
return x ** 2
if __name__ == '__main__':
processor = ParallelProcessor()
results = processor.map(transform, range(10))
print(results)
生产者-消费者模式
Python
import multiprocessing
import time
def producer(queue: multiprocessing.Queue, count: int):
"生产者进程"
for i in range(count):
item = f"item_{i}"
queue.put(item)
print(f"生产: {item}")
time.sleep(0.1)
queue.put("DONE") # 结束信号
def consumer(queue: multiprocessing.Queue, consumer_id: int):
"消费者进程"
while True:
item = queue.get()
if item == "DONE":
queue.put("DONE") # 传递给其他消费者
break
print(f"消费者{consumer_id} 处理: {item}")
time.sleep(0.2)
if __name__ == '__main__':
queue = multiprocessing.Queue(maxsize=10)
# 1个生产者,3个消费者
producer_p = multiprocessing.Process(
target=producer, args=(queue, 20)
)
consumer_ps = [
multiprocessing.Process(
target=consumer, args=(queue, i)
)
for i in range(3)
]
producer_p.start()
for p in consumer_ps:
p.start()
producer_p.join()
for p in consumer_ps:
p.join()
进程同步
Python
import multiprocessing
import time
def worker_with_lock(lock: multiprocessing.Lock, counter: multiprocessing.Value):
"带锁的工作进程"
for _ in range(1000):
with lock: # 获取锁
counter.value += 1
if __name__ == '__main__':
lock = multiprocessing.Lock()
counter = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(
target=worker_with_lock,
args=(lock, counter)
)
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"计数器: {counter.value}") # 应为4000
Python
import multiprocessing
# Semaphore 信号量
def limited_resource_worker(sem: multiprocessing.Semaphore, worker_id: int):
"受限资源访问"
with sem: # 获取信号量
print(f"Worker {worker_id} 获取资源")
time.sleep(1)
print(f"Worker {worker_id} 释放资源")
if __name__ == '__main__':
# 最多同时2个进程访问
sem = multiprocessing.Semaphore(2)
processes = [
multiprocessing.Process(
target=limited_resource_worker,
args=(sem, i)
)
for i in range(6)
]
for p in processes:
p.start()
for p in processes:
p.join()
进程池高级用法
Python
import multiprocessing
def init_worker():
"进程池初始化"
print(f"进程 {multiprocessing.current_process().pid} 初始化")
def task_with_state(x):
"任务"
return x * 2
if __name__ == '__main__':
# initializer 初始化
with multiprocessing.Pool(
processes=4,
initializer=init_worker
) as pool:
results = pool.map(task_with_state, range(10))
print(results)
Python
import multiprocessing
def long_task(n):
"长时间任务"
time.sleep(2)
return n * 10
if __name__ == '__main__':
with multiprocessing.Pool(4) as pool:
async_results = [
pool.apply_async(long_task, (i,))
for i in range(8)
]
# 检查完成状态
for i, r in enumerate(async_results):
ready = r.ready()
print(f"任务{i}: ready={ready}")
if ready:
print(f" 结果: {r.get()}")
# 等待全部完成
results = [r.get() for r in async_results]
print(f"全部结果: {results}")
错误处理
Python
import multiprocessing
def failing_task(x):
"可能失败的任务"
if x == 5:
raise ValueError("模拟错误")
return x * 2
if __name__ == '__main__':
with multiprocessing.Pool(4) as pool:
try:
results = pool.map(failing_task, range(10))
except ValueError as e:
print(f"捕获错误: {e}")
# 使用异常安全的映射
def safe_task(x):
try:
return failing_task(x)
except Exception as e:
return f"error: {e}"
with multiprocessing.Pool(4) as pool:
results = pool.map(safe_task, range(10))
print(results)
多进程最佳实践
Python
import multiprocessing
import signal
def setup_process():
"进程设置"
# 忽略中断信号(由主进程处理)
signal.signal(signal.SIGINT, signal.SIG_IGN)
def graceful_shutdown():
"优雅关闭"
import sys
print("优雅关闭...")
sys.exit(0)
if __name__ == '__main__':
# Windows需要freeze_support
multiprocessing.freeze_support()
with multiprocessing.Pool(
processes=4,
initializer=setup_process
) as pool:
try:
results = pool.map(cpu_task, range(10))
print(results)
except KeyboardInterrupt:
pool.terminate()
print("用户中断")
except Exception as e:
pool.terminate()
print(f"错误: {e}")
else:
pool.close()
pool.join()
要点总结
- Pool适合批量任务并行处理
- Queue/Pipe用于进程间通信
- Value/Array实现共享内存
- Lock/Semaphore保护共享资源
- 必须在
if __name__ == '__main__'下创建进程
📝 发现内容有误?点击此处直接编辑