全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

队列与消息查看

查看队列消息数量、消费者状态是排查消息堆积、消费异常的基础操作。

定义

队列查看包含两个层面:统计信息(消息总数、未确认消息数、消费者数量)与消息详情(单条消息内容、属性、路由信息)。Web 控制台提供可视化界面,Java API 提供编程方式获取队列状态。

Web 控制台查看

1. 队列列表

登录 http://localhost:15672,点击 Queues 标签,显示:

  • Name:队列名称
  • Messages:消息总数(含 Ready、Unacked)
    • Ready:待消费消息数
    • Unacked:已投递未确认消息数
  • Consumers:消费者数量
  • Memory:内存占用
  • Persisted:持久化消息数

2. 查看单条消息

点击队列名称进入详情页:

  1. 展开 Get messages 区域
  2. 填写获取数量(默认 1 条)
  3. 点击 Get Message(s) 按钮
  4. 显示消息内容(Payload)、属性(Headers)、路由信息

通过控制台查看消息会从队列中移除(默认 ackmode=ack_requeue_false),生产环境慎用。

若需查看不移除消息,选择 ackmode=nack_requeue_true 或设置 requeue=true。

3. 队列指标图表

队列详情页顶部显示实时图表:

  • 消息发布/消费速率(msg/s)
  • 消息总量趋势
  • 消费者数量变化

Java API 查看队列状态

获取队列统计信息

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;

public class QueueStats {
    private static final String QUEUE_NAME = "business_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 被动声明队列,仅获取统计信息,不创建队列
            boolean passive = true;
            DeclareOk result = channel.queueDeclarePassive(QUEUE_NAME);

            int messageCount = result.getMessageCount();
            int consumerCount = result.getConsumerCount();

            System.out.println("队列: " + QUEUE_NAME);
            System.out.println("消息总数: " + messageCount);
            System.out.println("消费者数量: " + consumerCount);
        }
    }
}

查看消息内容(不移除)

Java
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Map;

public class PeekMessage {
    private static final String QUEUE_NAME = "business_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 创建临时独占队列用于接收消息副本
            String tempQueue = channel.queueDeclare().getQueue();

            // 设置消费者,自动确认后重新入队
            boolean autoAck = true;
            channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                          Envelope envelope,
                                          AMQP.BasicProperties properties,
                                          byte[] body) throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("消息内容: " + message);
                    System.out.println("deliveryTag: " + envelope.getDeliveryTag());
                    System.out.println("routingKey: " + envelope.getRoutingKey());
                    System.out.println("exchange: " + envelope.getExchange());

                    // 打印消息属性
                    Map<String, Object> headers = properties.getHeaders();
                    if (headers != null) {
                        System.out.println("Headers: " + headers);
                    }

                    // 立即拒绝并重新入队(查看但不消费)
                    channel.basicReject(envelope.getDeliveryTag(), true);

                    // 仅查看一条后取消消费
                    channel.basicCancel(consumerTag);
                }
            });

            Thread.sleep(2000);
        }
    }
}

批量获取队列状态

Java
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Map;

public class BatchQueueStats {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            String[] queues = {"queue_a", "queue_b", "queue_c"};

            for (String queue : queues) {
                try {
                    // 被动声明,队列不存在会抛异常
                    DeclareOk result = channel.queueDeclarePassive(queue);
                    System.out.printf("队列: %s | 消息: %d | 消费者: %d%n",
                            queue,
                            result.getMessageCount(),
                            result.getConsumerCount());
                } catch (IOException e) {
                    System.out.println("队列不存在: " + queue);
                }
            }
        }
    }
}

命令行查看

Bash
# 列出所有队列及消息数
rabbitmqctl list_queues name messages consumers

# 查看指定虚拟主机的队列
rabbitmqctl list_queues -p /vhost_app name messages consumers

# 查看队列详细信息(含内存、磁盘、持久化状态)
rabbitmqctl list_queues name messages consumers memory durable

注意事项

queueDeclarePassive 仅获取统计信息,队列不存在时会抛出 IOException

Web 控制台 Get messages 操作默认会从队列移除消息,查看前需确认 ackmode 设置。

频繁调用 queueDeclarePassive 会增加管理接口开销,建议缓存统计结果。

消息内容为二进制,需确认编码格式(通常 UTF-8)才能正确解析。

要点总结

  • 队列统计:channel.queueDeclarePassive(queue) 返回 messageCount 和 consumerCount
  • Web 控制台:Queues 页面显示消息总数、Ready/Unacked 分布、消费者数量
  • 查看消息详情:控制台 Get messages 功能可获取消息内容和属性
  • 命令行:rabbitmqctl list_queues name messages consumers 快速查看统计信息
  • 控制台查看可能移除消息,需确认 ackmode 设置避免误消费

📝 发现内容有误?点击此处直接编辑

← 上一篇 管理插件启用
下一篇 → Direct 交换机
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库