全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

预取数量配置

basicQos 用于设置 prefetch count,限制消费者未确认消息的最大数量,防止单个消费者被大量消息压垮。

核心概念

  • prefetch count:消费者未 ACK 的消息上限
  • 达到上限后,RabbitMQ 不再投递新消息给该消费者
  • 消费者 ACK 后,释放配额,允许接收新消息
  • 实现负载均衡,避免快速消费者堆积消息、慢消费者空闲

Java 示例:设置预取数量

Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class PrefetchCountExample {

    private static final String QUEUE = "prefetch.queue";
    private static final String EXCHANGE = "prefetch.exchange";
    private static final int PREFETCH_COUNT = 10;

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE, com.rabbitmq.client.BuiltinExchangeType.DIRECT, true);
            channel.queueDeclare(QUEUE, true, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, "prefetch.key");

            // 设置预取数量:同一时刻最多 10 条未确认消息
            channel.basicQos(PREFETCH_COUNT);

            // 关闭自动确认,启用手动确认
            channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, StandardCharsets.UTF_8);
                    System.out.println("收到消息: " + message);

                    try {
                        // 模拟业务处理
                        processMessage(message);
                        // 业务处理成功,确认消息
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    } catch (Exception e) {
                        // 业务处理失败,拒绝消息,不重新入队
                        channel.basicNack(envelope.getDeliveryTag(), false, false);
                    }
                }
            });

            System.out.println("消费者已启动,prefetch count: " + PREFETCH_COUNT);
        }
    }

    private static void processMessage(String message) {
        // 模拟业务处理耗时
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

QoS 级别说明

Java
// 全局 QoS(作用于整个 Channel)
channel.basicQos(10);

// 非全局 QoS(作用于单个消费者)
channel.basicQos(0, 10, false);

// global=true:作用于整个 Channel 所有消费者
channel.basicQos(0, 10, true);
参数说明
prefetchSize预取消息总大小(字节),0 表示不限制
prefetchCount预取消息数量
globaltrue=Channel级别,false=Consumer级别

生产环境建议 prefetchCount 设置为消费者线程数的 1~2 倍,避免过小导致吞吐不足,或过大导致内存压力。

prefetch count 与性能的关系

prefetch count 值效果
0不限制,RabbitMQ 全量推送(默认行为)
1严格串行处理,吞吐量最低
10-50推荐范围,平衡吞吐与负载均衡
100+高吞吐场景,注意消费者内存压力

注意事项

  • prefetch count 必须配合手动确认(autoAck=false)使用,否则无效
  • 设置为 0 表示不限制预取数量
  • Channel 级别和 Consumer 级别的 QoS 可以共存
  • prefetchSize 通常设置为 0(按数量限制),不推荐按字节限制

要点总结

  • basicQos(prefetchCount) 限制消费者未 ACK 消息的上限
  • 配合手动确认使用,实现负载均衡和流量控制
  • 推荐值:消费者线程数的 1~2 倍
  • global=true 作用于 Channel 级别,global=false 作用于 Consumer 级别
  • prefetch count=0 表示不限制,慎用(可能导致消费者内存溢出)

📝 发现内容有误?点击此处直接编辑

← 上一篇 消费速率控制
下一篇 → 仲裁队列 Quorum Queue
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库