全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

工作队列基础

工作队列模式通过单个队列与多个消费者实现任务分发,下面梳理核心概念与基础实现。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

工作队列模型

Java
生产者 -> 交换机 -> 队列 <- 消费者1
                    <- 消费者2
                    <- 消费者3

工作队列模式核心特点:

  • 多个消费者共享同一队列
  • 消息按顺序分发给可用消费者
  • 适用于耗时任务的分发处理

生产者实现

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class WorkQueueProducerExample {
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明持久化队列
            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
            
            // 发送多个任务消息
            for (int i = 1; i <= 10; i++) {
                String message = "Task " + i;
                channel.basicPublish("", QUEUE_NAME, 
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes(StandardCharsets.UTF_8));
                System.out.println("已发送: " + message);
            }
        }
    }
}

消费者实现

text
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;

public class WorkQueueConsumerExample {
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明队列(与生产者保持一致)
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 限制每次只处理一条未确认消息
        channel.basicQos(1);
        
        System.out.println("等待任务...");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("收到任务: " + message);
            
            try {
                doWork(message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                System.out.println("任务完成: " + message);
            } catch (Exception e) {
                System.err.println("任务失败: " + message);
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }
        };
        
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
    }

    private static void doWork(String task) throws InterruptedException {
        // 模拟任务处理耗时
        Thread.sleep(1000);
    }
}

核心概念说明

概念说明
队列共享多个消费者实例订阅同一队列
消息分发消息按顺序投递给可用消费者
手动确认autoAck=false 确保消息处理成功后才确认
持久化队列和消息持久化保障服务重启后不丢失

注意事项

工作队列模式默认使用轮询分发,消息均匀分配给每个消费者。

必须关闭自动确认(autoAck=false),否则消息处理失败时会丢失。

持久化队列和消息需同时设置,仅设置一方无法保障数据持久性。

多个消费者进程可并行启动,共享队列自动实现负载均衡。

要点总结

  • 工作队列模式通过单队列多消费者实现任务分发。
  • 生产者发送消息至队列,多个消费者共同订阅处理。
  • 手动确认机制保障消息可靠性,避免处理失败导致丢失。
  • 持久化设置(队列+消息)保障服务重启后数据不丢失。
  • 多个消费者进程自动实现负载均衡,提升任务处理吞吐量。

📝 发现内容有误?点击此处直接编辑

← 上一篇 任务处理耗时模拟
下一篇 → 消息分发策略
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库