Python分布式任务队列
Celery是Python最流行的分布式任务队列框架,支持异步任务执行、定时调度和任务监控。
核心架构
Celery架构由三部分组成:
- Producer:任务生产者,通常是Web应用
- Broker:消息代理,推荐Redis或RabbitMQ
- Worker:任务消费者,执行具体任务
基础配置
安装依赖
Bash
pip install celery redis
创建Celery应用
Python
# tasks.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
)
任务定义
普通任务
Python
@app.task
def add(x, y):
return x + y
@app.task(bind=True)
def process_data(self, data):
# bind=True可访问self.request获取任务上下文
task_id = self.request.id
return f"Processed: {data}, task_id: {task_id}"
重试机制
Python
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def unreliable_task(self):
try:
# 可能失败的操作
call_external_api()
except Exception as exc:
raise self.retry(exc=exc)
任务调用
异步调用
Python
# 发送任务到队列
result = add.delay(4, 5)
# 应用异步调用
result = add.apply_async(args=[4, 5])
# 延迟执行
result = add.apply_async(args=[4, 5], countdown=10)
# 指定ETA执行
from datetime import datetime, timedelta
result = add.apply_async(
args=[4, 5],
eta=datetime.utcnow() + timedelta(minutes=5)
)
获取结果
Python
result = add.delay(4, 5)
# 阻塞等待结果(带超时)
value = result.get(timeout=10)
# 检查状态
if result.ready():
print(result.result)
print(result.status) # SUCCESS, FAILURE, PENDING
任务签名与链式调用
签名(Signature)
Python
from celery import signature
# 创建任务签名
s = add.s(2, 3) # 偏参数
s = add.s(2) # 部分参数
# 执行签名
result = s.delay()
链式任务
Python
# 链式调用:前一个任务的输出作为后一个任务的输入
from celery import chain
workflow = chain(add.s(2, 3), add.s(4))
result = workflow.apply_async() # (2+3)+4 = 9
# 简写形式
result = (add.s(2, 3) | add.s(4)).apply_async()
任务组
Python
from celery import group
# 并行执行多个任务
job = group(add.s(i, i) for i in range(10))
result = job.apply_async()
# 获取所有结果
values = result.get() # [0, 2, 4, 6, 8, ...]
任务和弦(Chord)
Python
from celery import chord
# 先并行执行,再汇总
callback = sum_results.s()
header = [add.s(i, i) for i in range(10)]
result = chord(header)(callback)
定时任务
Celery Beat配置
Python
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-every-minute': {
'task': 'tasks.add',
'schedule': 60.0, # 每60秒
'args': (4, 5)
},
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (10, 20),
},
}
启动Beat进程
Bash
celery -A tasks beat --loglevel=info
Worker管理
启动Worker
Bash
# 基本启动
celery -A tasks worker --loglevel=info
# 指定并发数
celery -A tasks worker --concurrency=4
# 指定队列
celery -A tasks worker -Q queue1,queue2
# 后台运行
celery -A tasks worker --detach --loglevel=info
任务路由
Python
app.conf.task_routes = {
'tasks.compute_*': {'queue': 'compute'},
'tasks.io_*': {'queue': 'io'},
}
# 启动特定队列的Worker
celery -A tasks worker -Q compute --concurrency=2
celery -A tasks worker -Q io --concurrency=10
监控与管理
Flower监控面板
Bash
pip install flower
celery -A tasks flower --port=5555
访问 http://localhost:5555 查看任务状态、Worker状态、任务历史等。
命令行管理
Bash
# 查看活跃任务
celery -A tasks inspect active
# 查看注册任务
celery -A tasks inspect registered
# 撤销任务
celery -A tasks revoke <task_id>
# 终止正在执行的任务
celery -A tasks revoke <task_id> --terminate
最佳实践
任务设计原则
Python
# 任务要幂等
@app.task
def process_order(order_id):
order = Order.get(order_id)
if order.processed:
return "Already processed"
# 处理订单
order.processed = True
order.save()
# 避免大任务
@app.task
def bad_task():
# 不要这样
process_million_records()
@app.task
def good_task():
# 拆分小任务
for batch in get_batches():
process_batch.delay(batch)
错误处理
Python
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3})
def task_with_auto_retry(self):
# 自动重试所有异常
pass
@app.task(bind=True)
def task_with_fallback(self):
try:
return risky_operation()
except SpecificError:
return fallback_value()
注意:生产环境必须配置结果后端的过期时间,避免Redis内存溢出。
要点总结
- Celery架构:Producer → Broker → Worker,Broker推荐Redis/RabbitMQ
- 任务定义使用
@app.task装饰器,支持重试、绑定、超时配置 - 异步调用使用
delay()或apply_async(),链式任务使用|操作符 - 定时任务使用Celery Beat,通过
beat_schedule配置调度规则 - 生产环境使用Flower监控,任务设计要幂等、细粒度、可重试
存放路径:articles/PYTHON/专家/并发与异步高级/分布式任务队列.md
📝 发现内容有误?点击此处直接编辑