全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消费速率控制

预取数量(prefetch count)决定了Broker在未收到ACK前可向消费者推送的最大消息数,合理控制可防止消费者过载并提升系统稳定性。

定义

basicQos方法用于设置Channel或Consumer级别的预取数量。当未确认消息数达到阈值时,Broker暂停向该消费者投递,直到收到ACK释放配额。

Maven依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

配置与示例

固定预取数量

Java
import com.rabbitmq.client.*;

public class FixedPrefetchConsumer {
    private static final String QUEUE_NAME = "fixed_prefetch_queue";
    // 固定预取数量
    private static final int PREFETCH_COUNT = 10;

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 设置固定预取数量
        channel.basicQos(PREFETCH_COUNT);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("处理消息: " + message);
            processMessage(message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }

    private static void processMessage(String message) {
        try {
            Thread.sleep(200);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

自适应动态预取数量

Java
import com.rabbitmq.client.*;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class AdaptivePrefetchConsumer {
    private static final String QUEUE_NAME = "adaptive_prefetch_queue";
    
    // 自适应参数
    private static final int MIN_PREFETCH = 5;
    private static final int MAX_PREFETCH = 100;
    private static final int INITIAL_PREFETCH = 20;
    private static final long ADJUST_INTERVAL_MS = 5000; // 每5秒调整一次

    private volatile int currentPrefetch = INITIAL_PREFETCH;
    private final AtomicLong processStartTime = new AtomicLong(System.currentTimeMillis());
    private final AtomicInteger processedCount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        AdaptivePrefetchConsumer consumer = new AdaptivePrefetchConsumer();
        consumer.start();
    }

    public void start() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        // 初始预取数量
        channel.basicQos(currentPrefetch);

        // 启动自适应调整线程
        Thread adjustThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(ADJUST_INTERVAL_MS);
                    adjustPrefetch(channel);
                } catch (Exception e) {
                    break;
                }
            }
        });
        adjustThread.setDaemon(true);
        adjustThread.start();

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            long tag = delivery.getEnvelope().getDeliveryTag();
            try {
                processMessage(message);
                processedCount.incrementAndGet();
                channel.basicAck(tag, false);
            } catch (Exception e) {
                channel.basicNack(tag, false, true);
            }
        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
    }

    private synchronized void adjustPrefetch(Channel channel) throws Exception {
        long elapsed = System.currentTimeMillis() - processStartTime.get();
        int count = processedCount.getAndSet(0);
        
        // 计算每秒处理消息数(TPS)
        double tps = (count * 1000.0) / elapsed;
        
        int oldPrefetch = currentPrefetch;
        if (tps < 10) {
            // 处理能力不足,降低预取数量
            currentPrefetch = Math.max(MIN_PREFETCH, currentPrefetch / 2);
        } else if (tps > 50) {
            // 处理能力充足,提高预取数量
            currentPrefetch = Math.min(MAX_PREFETCH, currentPrefetch + 10);
        }

        if (currentPrefetch != oldPrefetch) {
            channel.basicQos(currentPrefetch);
            System.out.println("预取数量调整: " + oldPrefetch + " -> " + currentPrefetch + " (TPS=" + String.format("%.1f", tps) + ")");
        }
        
        processStartTime.set(System.currentTimeMillis());
    }

    private void processMessage(String message) {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

basicQos可在Channel级别设置,若需为不同消费者设置不同预取值,应在创建每个消费者前单独调用。basicQos(prefetchSize, prefetchCount, global)global=false表示仅应用于当前消费者。

注意事项

  1. 预取数量过小:消费者频繁等待Broker推送消息,吞吐量下降。适合处理耗时较长的消息场景。

  2. 预取数量过大:大量消息堆积在消费者内存中,可能导致OOM或处理延迟放大。

  3. 动态调整频率:自适应调整不宜过频,建议3-10秒为一个统计周期,避免抖动。

  4. 多消费者场景:同一Channel上多个消费者共享预取配额,建议为每个消费者分配独立Channel。

  5. prefetchSize参数basicQos第一个参数prefetchSize限制未确认消息总字节数,通常设为0(不限制),仅用prefetchCount控制数量。

要点总结

  • basicQos(prefetchCount)控制未ACK前Broker可推送的最大消息数
  • 固定预取数量适合处理耗时均匀的消息场景
  • 自适应动态调整根据实际TPS动态伸缩预取数量
  • 预取数量过小降低吞吐,过大增加内存压力
  • 多消费者应为每个消费者分配独立Channel以便独立控制

📝 发现内容有误?点击此处直接编辑

← 上一篇 消费者异常恢复
下一篇 → 预取数量配置
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库