Python 线程安全数据结构
queue 模块提供线程安全的队列,内置锁机制,适合多线程数据交换。
Queue 基础队列
Python
import threading
import queue
q = queue.Queue(maxsize=10) # 最大容量10
def producer():
for i in range(5):
q.put(i) # 放入元素
print(f"生产: {i}")
def consumer():
while True:
item = q.get() # 取出元素
print(f"消费: {item}")
q.task_done() # 标记任务完成
if item == 4:
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()
阻塞与非阻塞操作
Python
import queue
q = queue.Queue(maxsize=2)
# 阻塞操作
q.put(1) # 阻塞直到有空位
q.put(2) # 阻塞直到有空位
q.put(3, timeout=2) # 等待2秒,超时抛 queue.Full
# 非阻塞操作
q.put_nowait(4) # 无空位立即抛 queue.Full
# 获取元素
item = q.get() # 阻塞直到有元素
item = q.get(timeout=2) # 等待2秒,超时抛 queue.Empty
item = q.get_nowait() # 无元素立即抛 queue.Empty
LifoQueue 后进先出
Python
import queue
q = queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 3(最后放入的)
print(q.get()) # 2
print(q.get()) # 1
PriorityQueue 优先级队列
Python
import queue
q = queue.PriorityQueue()
# 元素格式:(优先级, 数据)
q.put((3, '低优先级'))
q.put((1, '高优先级'))
q.put((2, '中优先级'))
print(q.get()) # (1, '高优先级')
print(q.get()) # (2, '中优先级')
print(q.get()) # (3, '低优先级')
自定义对象优先级:
Python
import queue
class Task:
def __init__(self, priority, name):
self.priority = priority
self.name = name
def __lt__(self, other):
return self.priority < other.priority
q = queue.PriorityQueue()
q.put(Task(3, '低'))
q.put(Task(1, '高'))
q.put(Task(2, '中'))
print(q.get().name) # 高
print(q.get().name) # 中
print(q.get().name) # 低
队列状态查询
Python
import queue
q = queue.Queue(maxsize=5)
for i in range(3):
q.put(i)
print(q.qsize()) # 3(当前元素数)
print(q.empty()) # False
print(q.full()) # False
q.put(4)
q.put(5)
print(q.full()) # True
任务完成追踪
Python
import queue
import threading
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f"处理: {item}")
q.task_done() # 标记完成
if item == 'stop':
break
t = threading.Thread(target=worker)
t.start()
for item in ['a', 'b', 'c', 'stop']:
q.put(item)
q.join() # 等待所有任务完成
print("所有任务已完成")
SimpleQueue 简单队列
Python
import queue
q = queue.SimpleQueue() # 无界、无 task_done 功能
q.put(1)
q.put(2)
print(q.get()) # 1
生产者-消费者模式
Python
import threading
import queue
import time
q = queue.Queue(maxsize=5)
def producer():
for i in range(10):
q.put(i)
print(f"生产: {i}")
time.sleep(0.1)
def consumer(name):
while True:
item = q.get()
print(f"{name} 消费: {item}")
q.task_done()
time.sleep(0.2)
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer, args=('消费者A',))
t3 = threading.Thread(target=consumer, args=('消费者B',))
for t in [t1, t2, t3]:
t.start()
t1.join()
q.join() # 等待队列清空
队列类型对比
| 队列类型 | 特点 | 适用场景 |
|---|---|---|
| Queue | FIFO | 任务队列 |
| LifoQueue | LIFO | 栈式处理 |
| PriorityQueue | 按优先级 | 任务调度 |
| SimpleQueue | 无界简单 | 轻量通信 |
要点总结
- Queue 内置锁机制,线程安全
put()放入,get()取出,默认阻塞put_nowait()、get_nowait()非阻塞版本task_done()标记任务完成,join()等待清空- LifoQueue 后进先出,PriorityQueue 按优先级
- 生产者-消费者模式是经典应用
- 队列简化多线程数据交换,避免手动同步
📝 发现内容有误?点击此处直接编辑