全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消费者基础接收

消费者负责订阅队列并处理消息,下面梳理基础接收流程与核心 API。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

基础接收流程

Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
import java.io.IOException;

public class BasicConsumerExample {
    private static final String QUEUE_NAME = "demo_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 创建消费者
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("收到消息: " + message);
            
            // 手动确认消息
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        // 开始消费
        boolean autoAck = false;  // 关闭自动确认
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
        
        System.out.println("等待接收消息...");
    }
}

核心 API 说明

queueDeclare 队列声明

Java
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数说明
queue队列名称
durable是否持久化,true 表示服务器重启后保留
exclusive是否排他,true 表示仅当前连接可见
autoDelete是否自动删除,true 表示无消费者时删除
arguments队列扩展参数,通常为 null

basicConsume 消费订阅

Java
channel.basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
参数说明
queue队列名称
autoAck是否自动确认,false 表示需手动 ack
deliverCallback消息到达时的回调函数
cancelCallback消费者被取消时的回调函数

消息确认机制

Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;

public class AckConsumerExample {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                System.out.println("处理消息: " + message);
                
                try {
                    // 模拟业务处理
                    processMessage(message);
                    // 处理成功,确认消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    // 处理失败,拒绝消息并重新入队
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }

    private static void processMessage(String message) {
        // 业务逻辑
    }
}

注意事项

autoAck 设为 false 时必须手动调用 basicAck 或 basicNack,否则消息会变为 unacked 状态且不会被重新投递。

basicAck 的第二个参数 multiple 为 true 时会确认当前 deliveryTag 之前所有未确认消息。

basicNack 第三个参数 requeue 为 true 时消息会重新入队,为 false 时消息被丢弃或进入死信队列。

消费者回调函数中的异常不会自动 nack 消息,需显式调用 basicNack 处理失败场景。

要点总结

  • 消费者接收消息需经历:创建连接 -> 创建信道 -> 声明队列 -> 注册回调 -> 开始消费。
  • basicConsume 用于订阅队列,autoAck 决定是否需要手动确认消息。
  • 手动确认模式下必须调用 basicAck(成功)或 basicNack(失败)处理消息。
  • basicNack 的 requeue 参数控制失败消息是否重新入队。
  • 推荐使用手动确认模式保障消息可靠性,避免消息丢失。

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息属性与编码
下一篇 → 生产者基础发送
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库