流
Redis Stream是Redis 5.0引入的消息队列数据结构,支持消息持久化、消费组、消息确认机制,是Redis专业消息队列的实现。
Stream原理
设计特点
Bash
Stream特点:
- 消息持久化存储
- 每条消息有唯一ID(时间戳-序号)
- 支持消费组(Consumer Group)
- 支持消息确认(ACK)
- 支持阻塞读取
- 消息有序不可修改
消息ID结构
Bash
消息ID格式:<milliseconds>-<sequence_number>
示例:
1700000000-0 # 时间戳1700000000,序号0
1700000000-1 # 同一时间戳的第2条消息
1700000001-0 # 下一毫秒的消息
特点:
- 时间戳保证大致有序
- 序号保证同毫秒内有序
- 自动生成,用户无需指定
消息存储结构
Bash
Stream内部结构:
- 基于Radix Tree(基数树)
- 高效内存使用
- 支持范围查询
- O(log N)查找效率
消息内容:
- 多个field-value对
- 类似Hash结构
- 每条消息可存储多个字段
Stream核心命令
XADD添加消息
Bash
# 基本语法
XADD key [MAXLEN ~ count] * field value [field value ...]
# 添加消息(自动生成ID)
XADD stream:orders * orderId 1001 status pending
# 返回: "1700000000-0"
# 添加多条消息
XADD stream:logs * level error message "Connection failed" timestamp 1700000000
# 限制Stream长度
XADD stream:orders MAXLEN ~ 1000 * orderId 1002
# 保留约1000条消息,自动删除旧消息
# 手动指定ID
XADD stream:orders 1700000001-0 orderId 1003
XREAD读取消息
Bash
# 基本语法
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# 从指定ID开始读取
XREAD COUNT 10 STREAMS stream:orders 0
# 从ID=0开始读取10条
# 从最新消息开始读取
XREAD STREAMS stream:orders $
# $表示只获取新消息
# 阻塞读取(等待新消息)
XREAD COUNT 10 BLOCK 5000 STREAMS stream:orders $
# 等待5秒,有新消息返回
# 读取多个Stream
XREAD STREAMS stream:orders stream:logs 0 0
XRANGE/XREVRANGE范围读取
Bash
# 基本语法
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
# 获取全部消息
XRANGE stream:orders - +
# - 表示最小ID,+ 表示最大ID
# 获取指定范围
XRANGE stream:orders 1700000000-0 1700000001-0
# 获取指定数量
XRANGE stream:orders - + COUNT 10
# 倒序获取(最新消息)
XREVRANGE stream:orders + - COUNT 10
XLEN获取消息数量
Bash
# 获取Stream长度
XLEN stream:orders
# 返回: 100
XDEL删除消息
Bash
# 删除指定消息
XDEL stream:orders 1700000000-0
# 删除不影响消费组状态
# 消息ID仍被消费组记录
XTRIM裁剪Stream
Bash
# 基本语法
XTRIM key MAXLEN [~] count
# 精确裁剪(保留指定数量)
XTRIM stream:orders MAXLEN 1000
# 近似裁剪(约保留指定数量)
XTRIM stream:orders MAXLEN ~ 1000
# ~ 表示近似,效率更高
消费组命令
XGROUP创建消费组
Bash
# 基本语法
XGROUP CREATE key groupname ID [MKSTREAM]
# 创建消费组(从最新消息开始)
XGROUP CREATE stream:orders group1 $
# $表示从新消息开始消费
# 创建消费组(从头开始消费)
XGROUP CREATE stream:orders group2 0
# 创建Stream和消费组(MKSTREAM)
XGROUP CREATE stream:new group1 0 MKSTREAM
XREADGROUP消费组读取
Bash
# 基本语法
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
# 消费组读取
XREADGROUP GROUP group1 consumer1 COUNT 10 STREAMS stream:orders >
# > 表示获取未分配的新消息
# 阻塞读取
XREADGROUP GROUP group1 consumer1 BLOCK 5000 STREAMS stream:orders >
# 不需要ACK(自动确认)
XREADGROUP GROUP group1 consumer1 NOACK STREAMS stream:orders >
XACK消息确认
Bash
# 确认消息已处理
XACK stream:orders group1 1700000000-0
# 确认多条消息
XACK stream:orders group1 1700000000-0 1700000000-1
XPENDING查看待处理消息
Bash
# 查看待处理消息摘要
XPENDING stream:orders group1
# 查看待处理消息详情
XPENDING stream:orders group1 - + 10
# 查看指定消费者的待处理消息
XPENDING stream:orders group1 - + 10 consumer1
XCLAIM转移消息
Bash
# 基本语法
XCLAIM key group consumer min-idle-time ID [ID ...]
# 将消息转移给其他消费者
XCLAIM stream:orders group1 consumer2 60000 1700000000-0
# 空闲超过60秒的消息转移给consumer2
XINFO查看信息
Bash
# 查看Stream信息
XINFO STREAM stream:orders
# 查看消费组信息
XINFO GROUPS stream:orders
# 查看消费者信息
XINFO CONSUMERS stream:orders group1
命令速查表
| 命令 | 说明 | 示例 |
|---|---|---|
| XADD | 添加消息 | XADD k * f v |
| XREAD | 读取消息 | XREAD STREAMS k 0 |
| XRANGE | 范围读取 | XRANGE k - + |
| XLEN | 消息数量 | XLEN k |
| XDEL | 删除消息 | XDEL k id |
| XTRIM | 裁剪Stream | XTRIM k MAXLEN ~1000 |
| XGROUP | 创建组 | XGROUP CREATE k g 0 |
| XREADGROUP | 组消费 | XREADGROUP GROUP g c ... |
| XACK | 确认消息 | XACK k g id |
| XPENDING | 待处理 | XPENDING k g |
| XCLAIM | 转移消息 | XCLAIM k g c 60000 id |
| XINFO | 查看信息 | XINFO STREAM k |
应用场景
1. 消息队列
Bash
# 生产者发送消息
XADD queue:orders * orderId 1001 userId 1000 amount 99.9
# 消费者消费消息
# 创建消费组
XGROUP CREATE queue:orders processors 0 MKSTREAM
# 消费者1
XREADGROUP GROUP processors consumer1 BLOCK 5000 STREAMS queue:orders >
# 处理消息
process_order(message)
# 确认消息
XACK queue:orders processors message_id
2. 事件流
Bash
# 记录系统事件
XADD stream:events * type login userId 1000 timestamp 1700000000
XADD stream:events * type logout userId 1000 timestamp 1700000001
# 分析消费组消费事件
XREADGROUP GROUP analytics consumer1 STREAMS stream:events >
3. 日志收集
Bash
# 应用日志写入Stream
XADD stream:app:logs * level error message "Database connection failed" stacktrace "..."
# 日志聚合服务消费
XREADGROUP GROUP aggregator consumer1 COUNT 100 STREAMS stream:app:logs >
4. 实时数据流
Python
# 实时数据写入
XADD stream:metrics * cpu 75.5 memory 80.2 disk 45.0
# 监控服务读取
XREAD COUNT 1 BLOCK 1000 STREAMS stream:metrics $
5. 活动追踪
Python
# 用户活动记录
XADD stream:activity:user:1000 * action click page "/product/500" timestamp 1700000000
# 分析最近活动
XRANGE stream:activity:user:1000 - + COUNT 50
消费组详解
消费组原理
Bash
消费组机制:
1. 消息分配给消费者
2. 每条消息只分配给一个消费者
3. 消费者处理后确认(ACK)
4. 未确认的消息可被转移
状态:
- 已分配:消息分配给消费者
- 已确认(ACK):处理完成
- 待处理(PENDING):已分配但未确认
消费者状态
Bash
# 查看消费者信息
XINFO CONSUMERS stream:orders group1
# 返回:
# name: consumer1
# pending: 5(待处理消息数)
# idle: 1000(空闲时间ms)
消息流转
Bash
消息状态流转:
新消息 → 分配给消费者 → 消费者处理 → ACK确认
↓
PENDING列表(未确认)
↓
超时后可XCLAIM转移
消息可靠性
ACK机制
text
def consume_message():
# 获取消息
message = redis.xreadgroup(
group="group1",
consumer="consumer1",
streams={"stream:orders": ">"}
)
try:
# 处理消息
process(message)
# 确认消息
redis.xack("stream:orders", "group1", message["id"])
except Exception:
# 处理失败,消息留在PENDING列表
# 可稍后重试或转移给其他消费者
pass
死信处理
text
def check_pending_messages():
# 检查待处理消息
pending = redis.xpending("stream:orders", "group1")
for msg_id in pending:
# 检查空闲时间
if idle_time > 60000: # 超过60秒
# 转移给其他消费者或入死信队列
redis.xclaim("stream:orders", "group1", "consumer2", 60000, msg_id)
性能优化
MAXLEN限制长度
text
# 限制Stream长度避免无限增长
XADD stream:orders MAXLEN ~ 10000 * orderId 1001
# 近似裁剪效率更高
# 精确裁剪开销大
消费组优化
text
# 合理消费者数量
# 建议:消费组消费者数 = 处理能力需要
# 批量消费
XREADGROUP GROUP g c COUNT 100 STREAMS s >
定期清理
text
# 清理已确认的旧消息
XTRIM stream:orders MAXLEN ~ 10000
# 清理空闲消费者
XGROUP DELCONSUMER stream:orders group1 consumer_inactive
与其他消息队列对比
| 特性 | Redis Stream | List队列 | Pub/Sub | 专业MQ |
|---|---|---|---|---|
| 持久化 | 支持 | 支持 | 不支持 | 支持 |
| 消费组 | 支持 | 不支持 | 不支持 | 支持 |
| ACK | 支持 | 手动实现 | 不支持 | 支持 |
| 消息ID | 有序ID | 无 | 无 | 多种 |
| 复杂度 | 中 | 低 | 低 | 高 |
| 适用 | 中小规模 | 简单队列 | 实时推送 | 大规模 |
要点总结
- Stream是Redis 5.0专业消息队列,支持持久化和消费组
- XADD添加消息,自动生成有序ID(时间戳-序号)
- XREAD/XRANGE读取消息,支持阻塞读取
- XGROUP CREATE创建消费组,消费组实现消息分发
- XREADGROUP消费组读取,>获取未分配的新消息
- XACK确认消息,XPENDING查看待处理消息
- XCLAIM转移超时未确认的消息给其他消费者
- MAXLEN ~ 限制Stream长度,避免无限增长
- 应用场景:消息队列、事件流、日志收集、实时数据
- ACK机制保证消息可靠,处理失败可重试
- 中小规模用Stream,大规模用专业MQ(RabbitMQ/Kafka)
📝 发现内容有误?点击此处直接编辑