生产者与消费者模型
消息队列的核心交互模型由两方组成:生产者(发送消息)和消费者(处理消息)。
角色定义
- 生产者(Producer):创建消息并发送到消息队列的组件
- 消费者(Consumer):从消息队列中拉取消息并执行处理的组件
- 消息队列(Broker):中转站,负责消息存储与路由
交互流程
XML
生产者 消息队列 消费者
│ │ │
├─ 发送消息 ───────────>│ │
│ ├─ 消息入队 │
│ ├────────── 推送/拉取 ───>│
│ │ ├─ 处理消息
│ │<───────── 确认(Ack) ────┤
│ ├─ 移除消息 │
└─ 无需等待返回 │ │
关键步骤:
- 生产者建立连接,声明队列,发送消息
- 消息队列接收并持久化消息
- 消费者监听队列,消息到达后触发消费
- 消费者处理完成后发送 Ack 确认
- 消息队列收到 Ack 后移除消息
Java 完整示例
Maven 依赖:
Java
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者:
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class TaskProducer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明持久化队列,重启后消息不丢失
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 5; i++) {
String message = "Task-" + i;
// 设置消息持久化
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println("发送: " + message);
}
System.out.println("所有任务已发送完毕");
}
}
}
消费者:
text
import com.rabbitmq.client.*;
public class TaskConsumer {
private static final String QUEUE_NAME = "task_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 队列已由生产者声明,此处直接消费
channel.basicQos(1); // 公平分发:未确认不推送新消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到: " + message);
// 模拟处理耗时
doWork(message.length());
// 手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("处理完成: " + message);
};
// autoAck=false 表示手动确认
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
System.out.println("消费者已启动,等待任务...");
}
private static void doWork(int dots) throws InterruptedException {
for (int i = 0; i < dots; i++) {
Thread.sleep(500);
}
}
}
关键参数说明
| 参数 | 生产者侧 | 消费者侧 |
|---|---|---|
| queueDeclare | 声明队列,控制持久化(durable) | 可不声明,直接消费 |
| basicPublish | 指定 Exchange、RoutingKey、消息属性 | 无 |
| basicQos | 无 | 控制预取数量(prefetchCount) |
| basicAck | 无 | 手动确认消息 |
basicQos(1) 实现公平分发,避免单个消费者堆积大量未处理消息。
消息持久化需同时设置:队列 durable=true + 消息 deliveryMode=2。
消费者未发送 Ack 时,消息在连接断开后会重新入队。
要点总结
- 生产者负责创建和发送消息,消费者负责拉取和处理消息
- 标准流程:发送 -> 入队 -> 消费 -> Ack确认 -> 移除
- 手动确认(autoAck=false)是可靠消费的基础
- basicQos 控制预取数量,实现公平分发与负载平衡
- 消息可靠性依赖队列持久化和消息持久化的双重保障
📝 发现内容有误?点击此处直接编辑