工作队列基础
工作队列模式通过单个队列与多个消费者实现任务分发,下面梳理核心概念与基础实现。
Maven 依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
工作队列模型
Java
生产者 -> 交换机 -> 队列 <- 消费者1
<- 消费者2
<- 消费者3
工作队列模式核心特点:
- 多个消费者共享同一队列
- 消息按顺序分发给可用消费者
- 适用于耗时任务的分发处理
生产者实现
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class WorkQueueProducerExample {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明持久化队列
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 发送多个任务消息
for (int i = 1; i <= 10; i++) {
String message = "Task " + i;
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送: " + message);
}
}
}
}
消费者实现
text
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
public class WorkQueueConsumerExample {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列(与生产者保持一致)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 限制每次只处理一条未确认消息
channel.basicQos(1);
System.out.println("等待任务...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("收到任务: " + message);
try {
doWork(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
System.out.println("任务完成: " + message);
} catch (Exception e) {
System.err.println("任务失败: " + message);
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
}
private static void doWork(String task) throws InterruptedException {
// 模拟任务处理耗时
Thread.sleep(1000);
}
}
核心概念说明
| 概念 | 说明 |
|---|---|
| 队列共享 | 多个消费者实例订阅同一队列 |
| 消息分发 | 消息按顺序投递给可用消费者 |
| 手动确认 | autoAck=false 确保消息处理成功后才确认 |
| 持久化 | 队列和消息持久化保障服务重启后不丢失 |
注意事项
工作队列模式默认使用轮询分发,消息均匀分配给每个消费者。
必须关闭自动确认(autoAck=false),否则消息处理失败时会丢失。
持久化队列和消息需同时设置,仅设置一方无法保障数据持久性。
多个消费者进程可并行启动,共享队列自动实现负载均衡。
要点总结
- 工作队列模式通过单队列多消费者实现任务分发。
- 生产者发送消息至队列,多个消费者共同订阅处理。
- 手动确认机制保障消息可靠性,避免处理失败导致丢失。
- 持久化设置(队列+消息)保障服务重启后数据不丢失。
- 多个消费者进程自动实现负载均衡,提升任务处理吞吐量。
📝 发现内容有误?点击此处直接编辑