全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python多进程并发模式

Python多进程绕过GIL限制,适合CPU密集型任务。掌握多进程架构设计方法。

multiprocessing 基础

Python
import multiprocessing
import os

def worker(task_id: int):
    "工作进程"
    print(f"进程 {os.getpid()} 处理任务 {task_id}")
    return task_id * 2

if __name__ == '__main__':
    # 创建进程池
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker, range(10))
        print(f"结果: {results}")
Python
import multiprocessing

# Process 类
def task(name: str):
    print(f"进程执行: {name}")

if __name__ == '__main__':
    processes = [
        multiprocessing.Process(target=task, args=(f"task_{i}",))
        for i in range(4)
    ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()  # 等待完成

进程池 Pool

Python
import multiprocessing
import time

def cpu_task(n: int) -> int:
    "CPU密集型任务"
    result = 0
    for i in range(n):
        result += i * i
    return result

if __name__ == '__main__':
    tasks = [10**6, 10**6, 10**6, 10**6]

    # 方式1:map
    with multiprocessing.Pool() as pool:
        results = pool.map(cpu_task, tasks)
        print(f"map结果: {results}")

    # 方式2:apply(单个任务)
    with multiprocessing.Pool() as pool:
        result = pool.apply(cpu_task, (10**6,))
        print(f"apply结果: {result}")

    # 方式3:map_async(异步)
    with multiprocessing.Pool() as pool:
        async_result = pool.map_async(cpu_task, tasks)
        # 可以做其他事情
        print("等待结果...")
        results = async_result.get()
        print(f"async结果: {results}")
Python
import multiprocessing

def process_with_callback(result):
    "回调函数"
    print(f"回调收到: {result}")

if __name__ == '__main__':
    with multiprocessing.Pool() as pool:
        # 带回调的异步执行
        pool.apply_async(
            cpu_task,
            (10**6,),
            callback=process_with_callback
        )

        pool.close()
        pool.join()

进程间通信

Python
import multiprocessing

# Queue 通信
def producer(queue: multiprocessing.Queue):
    "生产者"
    for i in range(10):
        queue.put(i)
        print(f"生产: {i}")
    queue.put(None)  # 结束信号

def consumer(queue: multiprocessing.Queue):
    "消费者"
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")

if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()
Python
import multiprocessing

# Pipe 通信(双向)
def sender(conn):
    "发送端"
    conn.send("来自sender的消息")
    msg = conn.recv()
    print(f"sender收到: {msg}")
    conn.close()

def receiver(conn):
    "接收端"
    msg = conn.recv()
    print(f"receiver收到: {msg}")
    conn.send("来自receiver的回复")
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()

    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

共享内存

Python
import multiprocessing

# Value 和 Array(共享内存)
def worker_with_shared(value: multiprocessing.Value, arr: multiprocessing.Array):
    "使用共享内存"
    with value.get_lock():
        value.value += 1

    for i in range(len(arr)):
        arr[i] += 1

if __name__ == '__main__':
    # 共享整数
    shared_value = multiprocessing.Value('i', 0)  # 'i' = 整数

    # 共享数组
    shared_array = multiprocessing.Array('i', [0, 0, 0, 0, 0])

    processes = [
        multiprocessing.Process(
            target=worker_with_shared,
            args=(shared_value, shared_array)
        )
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"共享值: {shared_value.value}")
    print(f"共享数组: {list(shared_array)}")
Python
import multiprocessing

# Manager(更灵活的共享)
def worker_with_manager(shared_dict, shared_list):
    "使用Manager共享"
    shared_dict['count'] = shared_dict.get('count', 0) + 1
    shared_list.append(multiprocessing.current_process().name)

if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        shared_dict = manager.dict()
        shared_list = manager.list()

        processes = [
            multiprocessing.Process(
                target=worker_with_manager,
                args=(shared_dict, shared_list)
            )
            for _ in range(4)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"共享字典: {shared_dict}")
        print(f"共享列表: {list(shared_list)}")

并行计算模式

Python
import multiprocessing
import math

def parallel_chunk_processing(data: list, chunk_size: int = 100):
    "分块并行处理"

    def process_chunk(chunk):
        return [math.sqrt(x) * x for x in chunk]

    # 分块
    chunks = [
        data[i:i + chunk_size]
        for i in range(0, len(data), chunk_size)
    ]

    with multiprocessing.Pool() as pool:
        results = pool.map(process_chunk, chunks)

    # 合并结果
    return [item for chunk in results for item in chunk]

if __name__ == '__main__':
    data = range(1000)
    result = parallel_chunk_processing(list(data))
    print(f"处理完成: {len(result)}项")
Python
import multiprocessing

class ParallelProcessor:
    "并行处理器"

    def __init__(self, num_workers: int = None):
        self.num_workers = num_workers or multiprocessing.cpu_count()

    def map(self, func, data: list) -> list:
        "并行映射"
        with multiprocessing.Pool(self.num_workers) as pool:
            return pool.map(func, data)

    def starmap(self, func, data: list) -> list:
        "带多参数的并行映射"
        with multiprocessing.Pool(self.num_workers) as pool:
            return pool.starmap(func, data)

    def imap(self, func, data: list):
        "迭代并行映射(惰性)"
        with multiprocessing.Pool(self.num_workers) as pool:
            for result in pool.imap(func, data):
                yield result

# 使用
def transform(x: int) -> int:
    return x ** 2

if __name__ == '__main__':
    processor = ParallelProcessor()
    results = processor.map(transform, range(10))
    print(results)

生产者-消费者模式

Python
import multiprocessing
import time

def producer(queue: multiprocessing.Queue, count: int):
    "生产者进程"
    for i in range(count):
        item = f"item_{i}"
        queue.put(item)
        print(f"生产: {item}")
        time.sleep(0.1)
    queue.put("DONE")  # 结束信号

def consumer(queue: multiprocessing.Queue, consumer_id: int):
    "消费者进程"
    while True:
        item = queue.get()
        if item == "DONE":
            queue.put("DONE")  # 传递给其他消费者
            break
        print(f"消费者{consumer_id} 处理: {item}")
        time.sleep(0.2)

if __name__ == '__main__':
    queue = multiprocessing.Queue(maxsize=10)

    # 1个生产者,3个消费者
    producer_p = multiprocessing.Process(
        target=producer, args=(queue, 20)
    )
    consumer_ps = [
        multiprocessing.Process(
            target=consumer, args=(queue, i)
        )
        for i in range(3)
    ]

    producer_p.start()
    for p in consumer_ps:
        p.start()

    producer_p.join()
    for p in consumer_ps:
        p.join()

进程同步

Python
import multiprocessing
import time

def worker_with_lock(lock: multiprocessing.Lock, counter: multiprocessing.Value):
    "带锁的工作进程"
    for _ in range(1000):
        with lock:  # 获取锁
            counter.value += 1

if __name__ == '__main__':
    lock = multiprocessing.Lock()
    counter = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(
            target=worker_with_lock,
            args=(lock, counter)
        )
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"计数器: {counter.value}")  # 应为4000
Python
import multiprocessing

# Semaphore 信号量
def limited_resource_worker(sem: multiprocessing.Semaphore, worker_id: int):
    "受限资源访问"
    with sem:  # 获取信号量
        print(f"Worker {worker_id} 获取资源")
        time.sleep(1)
        print(f"Worker {worker_id} 释放资源")

if __name__ == '__main__':
    # 最多同时2个进程访问
    sem = multiprocessing.Semaphore(2)

    processes = [
        multiprocessing.Process(
            target=limited_resource_worker,
            args=(sem, i)
        )
        for i in range(6)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

进程池高级用法

Python
import multiprocessing

def init_worker():
    "进程池初始化"
    print(f"进程 {multiprocessing.current_process().pid} 初始化")

def task_with_state(x):
    "任务"
    return x * 2

if __name__ == '__main__':
    # initializer 初始化
    with multiprocessing.Pool(
        processes=4,
        initializer=init_worker
    ) as pool:
        results = pool.map(task_with_state, range(10))
        print(results)
Python
import multiprocessing

def long_task(n):
    "长时间任务"
    time.sleep(2)
    return n * 10

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        async_results = [
            pool.apply_async(long_task, (i,))
            for i in range(8)
        ]

        # 检查完成状态
        for i, r in enumerate(async_results):
            ready = r.ready()
            print(f"任务{i}: ready={ready}")
            if ready:
                print(f"  结果: {r.get()}")

        # 等待全部完成
        results = [r.get() for r in async_results]
        print(f"全部结果: {results}")

错误处理

Python
import multiprocessing

def failing_task(x):
    "可能失败的任务"
    if x == 5:
        raise ValueError("模拟错误")
    return x * 2

if __name__ == '__main__':
    with multiprocessing.Pool(4) as pool:
        try:
            results = pool.map(failing_task, range(10))
        except ValueError as e:
            print(f"捕获错误: {e}")

    # 使用异常安全的映射
    def safe_task(x):
        try:
            return failing_task(x)
        except Exception as e:
            return f"error: {e}"

    with multiprocessing.Pool(4) as pool:
        results = pool.map(safe_task, range(10))
        print(results)

多进程最佳实践

Python
import multiprocessing
import signal

def setup_process():
    "进程设置"
    # 忽略中断信号(由主进程处理)
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def graceful_shutdown():
    "优雅关闭"
    import sys
    print("优雅关闭...")
    sys.exit(0)

if __name__ == '__main__':
    # Windows需要freeze_support
    multiprocessing.freeze_support()

    with multiprocessing.Pool(
        processes=4,
        initializer=setup_process
    ) as pool:
        try:
            results = pool.map(cpu_task, range(10))
            print(results)
        except KeyboardInterrupt:
            pool.terminate()
            print("用户中断")
        except Exception as e:
            pool.terminate()
            print(f"错误: {e}")
        else:
            pool.close()
            pool.join()

要点总结

  1. Pool适合批量任务并行处理
  2. Queue/Pipe用于进程间通信
  3. Value/Array实现共享内存
  4. Lock/Semaphore保护共享资源
  5. 必须在if __name__ == '__main__'下创建进程

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python自定义元类
下一篇 → Python异步上下文管理器
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库