全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-12 10 分钟 ✍️ juanwangdev

Redis 延迟队列

延迟队列用于处理需要延时执行的任务,如订单超时取消、定时提醒、延迟推送等场景。

延迟队列原理

核心设计

Bash
- 使用ZSet存储任务
- 任务执行时间作为score
- 时间戳越小越优先执行
- 循环检查到期任务并执行

任务数据结构

Bash
# 任务ID作为member,执行时间作为score
ZADD delay:queue 1700001000 "order:1001"

# 任务详细信息单独存储
SET task:order:1001 '{"orderId":1001,"action":"cancel"}'

ZSet实现延迟队列

添加延迟任务

Python
# 计算执行时间戳
execute_time = current_time + delay_seconds

# 添加到延迟队列
ZADD delay:queue execute_time "task_id"

# 存储任务详情
SET task:detail:task_id task_data

添加任务示例

Bash
def add_delay_task(task_id, task_data, delay_seconds):
    # 计算执行时间
    execute_time = int(time.time()) + delay_seconds

    # 添加到延迟队列
    redis.zadd("delay:queue", {task_id: execute_time})

    # 存储任务详情
    redis.set(f"task:detail:{task_id}", json.dumps(task_data))

获取到期任务

Python
# 获取当前时间戳之前的任务
ZRANGEBYSCORE delay:queue 0 {current_time}

# 获取到期任务数量
ZCOUNT delay:queue 0 {current_time}

执行到期任务

Python
def consume_delay_queue():
    current_time = int(time.time())

    # 获取到期任务
    tasks = redis.zrangebyscore("delay:queue", 0, current_time)

    for task_id in tasks:
        # 尝试获取任务(防止多消费者重复处理)
        if redis.zrem("delay:queue", task_id):
            # 获取任务详情
            task_data = redis.get(f"task:detail:{task_id}")

            # 执行任务
            try:
                execute_task(task_id, task_data)
                # 成功后删除任务详情
                redis.delete(f"task:detail:{task_id}")
            except:
                # 失败处理
                handle_failure(task_id, task_data)

消费者实现

单消费者循环

Python
def delay_queue_consumer():
    while True:
        current_time = int(time.time())

        # 获取到期任务
        tasks = redis.zrangebyscore("delay:queue", 0, current_time, start=0, num=10)

        for task_id in tasks:
            if redis.zrem("delay:queue", task_id):
                process_task(task_id)

        # 无任务时等待
        if not tasks:
            # 获取下一条任务的执行时间
            next_task = redis.zrange("delay:queue", 0, 0, withscores=True)
            if next_task:
                next_time = next_task[0][1]
                wait_time = max(0, next_time - current_time)
                time.sleep(min(wait_time, 1))
            else:
                time.sleep(1)

多消费者竞争

Python
def multi_consumer():
    while True:
        current_time = int(time.time())

        # 获取到期任务
        tasks = redis.zrangebyscore("delay:queue", 0, current_time)

        for task_id in tasks:
            # ZREM原子操作保证只有一个消费者获取
            if redis.zrem("delay:queue", task_id):
                process_task(task_id)
                break  # 处理一条后重新循环

        time.sleep(0.1)

ZREM原子操作保证任务不会被多个消费者重复获取。

任务失败重试

重试机制

Python
def process_task_with_retry(task_id, max_retry=3):
    task_data = redis.get(f"task:detail:{task_id}")
    retry_count = redis.get(f"task:retry:{task_id}") or 0

    try:
        execute_task(task_id, task_data)
        # 成功,清理数据
        redis.delete(f"task:detail:{task_id}")
        redis.delete(f"task:retry:{task_id}")
    except Exception as e:
        retry_count = int(retry_count) + 1

        if retry_count < max_retry:
            # 重试,延迟时间递增
            delay = retry_count * 60  # 第1次60秒,第2次120秒
            new_time = int(time.time()) + delay

            redis.zadd("delay:queue", {task_id: new_time})
            redis.set(f"task:retry:{task_id}", retry_count)
        else:
            # 超过最大重试次数
            handle_task_failure(task_id, task_data)
            redis.delete(f"task:detail:{task_id}")
            redis.delete(f"task:retry:{task_id}")

死信队列

Python
def handle_task_failure(task_id, task_data):
    # 超过重试次数,放入死信队列
    redis.lpush("delay:dead_letter", task_id)
    redis.set(f"dead:detail:{task_id}", task_data)

    # 告警
    alert(f"Task {task_id} failed after max retries")

应用场景详解

1. 订单超时取消

设计方案

Python
- 订单创建后30分钟未支付自动取消
- 创建订单时添加延迟任务
- 到期检查订单状态,未支付则取消

实现

Python
def create_order(order_id, user_id):
    # 创建订单
    db.insert_order(order_id, user_id, status="pending")

    # 添加30分钟后取消的延迟任务
    task_id = f"order_cancel:{order_id}"
    task_data = {"orderId": order_id, "action": "cancel"}

    add_delay_task(task_id, task_data, 1800)  # 30分钟

def cancel_unpaid_order(order_id):
    # 检查订单状态
    order = db.query_order(order_id)

    if order.status == "pending":
        # 未支付,取消订单
        db.update_order(order_id, status="cancelled")
        # 通知用户
        send_notification(user_id, "订单已取消")
    else:
        # 已支付,忽略
        pass

def pay_order(order_id):
    # 用户支付
    db.update_order(order_id, status="paid")
    # 删除延迟任务(已支付不需要取消)
    redis.zrem("delay:queue", f"order_cancel:{order_id}")

2. 定时提醒/通知

实现

Python
def schedule_reminder(user_id, message, remind_time):
    # remind_time为提醒时间戳
    task_id = f"reminder:{uuid()}"
    task_data = {"userId": user_id, "message": message}

    redis.zadd("delay:queue", {task_id: remind_time})
    redis.set(f"task:detail:{task_id}", json.dumps(task_data))

def send_reminder(task_data):
    # 发送提醒
    send_notification(task_data["userId"], task_data["message"])

3. 延迟推送

实现

Python
def schedule_push(user_id, content, delay_seconds):
    task_id = f"push:{uuid()}"
    task_data = {"userId": user_id, "content": content}

    execute_time = int(time.time()) + delay_seconds
    redis.zadd("delay:queue", {task_id: execute_time})
    redis.set(f"task:detail:{task_id}", json.dumps(task_data))

4. 定时数据同步

实现

Python
def schedule_sync(source, target, sync_time):
    task_id = f"sync:{source}:{target}"
    task_data = {"source": source, "target": target}

    redis.zadd("delay:queue", {task_id: sync_time})
    redis.set(f"task:detail:{task_id}", json.dumps(task_data))

性能优化

批量获取任务

Python
# 批量获取到期任务
tasks = redis.zrangebyscore("delay:queue", 0, current_time, start=0, num=100)

Pipeline优化

Python
# 使用Pipeline批量操作
pipe = redis.pipeline()
for task_id in tasks:
    pipe.zrem("delay:queue", task_id)
results = pipe.execute()

# 检查哪些任务成功获取
for i, task_id in enumerate(tasks):
    if results[i]:
        process_task(task_id)

任务分片

Python
# 多队列分片
for i in range(10):
    queue_key = f"delay:queue:{i}"
    # 各消费者处理不同队列

监控与告警

队列监控

text
# 队列积压监控
pending_count = redis.zcard("delay:queue")
if pending_count > 10000:
    alert("延迟队列积压过多")

# 任务延迟监控
current_time = int(time.time())
overdue_tasks = redis.zrangebyscore("delay:queue", 0, current_time - 60)
if overdue_tasks:
    alert(f"有{len(overdue_tasks)}个任务超期未处理")

告警设置

text
# 监控指标
metrics = {
    "pending_tasks": redis.zcard("delay:queue"),
    "overdue_tasks": len(redis.zrangebyscore("delay:queue", 0, current_time - 60)),
    "dead_letter_count": redis.llen("delay:dead_letter")
}

要点总结

  • ZSet实现延迟队列,执行时间作为score
  • ZRANGEBYSCORE获取到期任务,ZREM原子获取防止重复
  • 任务详情单独存储,任务ID作为ZSet的member
  • 单消费者循环:获取、处理、等待下一条任务
  • 多消费者竞争:ZREM保证只有一个消费者获取任务
  • 失败重试:递增延迟时间,超过次数入死信队列
  • 订单超时取消:创建订单时添加延迟任务,支付后删除任务
  • 定时提醒、延迟推送、定时同步适合延迟队列
  • 监控队列积压和超期任务,及时告警
  • Pipeline批量获取和删除,提高效率

📝 发现内容有误?点击此处直接编辑

← 上一篇 Redis 分布式锁
下一篇 → Redis 排行榜与社交功能
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库