消费速率控制
预取数量(prefetch count)决定了Broker在未收到ACK前可向消费者推送的最大消息数,合理控制可防止消费者过载并提升系统稳定性。
定义
basicQos方法用于设置Channel或Consumer级别的预取数量。当未确认消息数达到阈值时,Broker暂停向该消费者投递,直到收到ACK释放配额。
Maven依赖
XML
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
配置与示例
固定预取数量
Java
import com.rabbitmq.client.*;
public class FixedPrefetchConsumer {
private static final String QUEUE_NAME = "fixed_prefetch_queue";
// 固定预取数量
private static final int PREFETCH_COUNT = 10;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置固定预取数量
channel.basicQos(PREFETCH_COUNT);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("处理消息: " + message);
processMessage(message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private static void processMessage(String message) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
自适应动态预取数量
Java
import com.rabbitmq.client.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class AdaptivePrefetchConsumer {
private static final String QUEUE_NAME = "adaptive_prefetch_queue";
// 自适应参数
private static final int MIN_PREFETCH = 5;
private static final int MAX_PREFETCH = 100;
private static final int INITIAL_PREFETCH = 20;
private static final long ADJUST_INTERVAL_MS = 5000; // 每5秒调整一次
private volatile int currentPrefetch = INITIAL_PREFETCH;
private final AtomicLong processStartTime = new AtomicLong(System.currentTimeMillis());
private final AtomicInteger processedCount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
AdaptivePrefetchConsumer consumer = new AdaptivePrefetchConsumer();
consumer.start();
}
public void start() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 初始预取数量
channel.basicQos(currentPrefetch);
// 启动自适应调整线程
Thread adjustThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(ADJUST_INTERVAL_MS);
adjustPrefetch(channel);
} catch (Exception e) {
break;
}
}
});
adjustThread.setDaemon(true);
adjustThread.start();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
long tag = delivery.getEnvelope().getDeliveryTag();
try {
processMessage(message);
processedCount.incrementAndGet();
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
private synchronized void adjustPrefetch(Channel channel) throws Exception {
long elapsed = System.currentTimeMillis() - processStartTime.get();
int count = processedCount.getAndSet(0);
// 计算每秒处理消息数(TPS)
double tps = (count * 1000.0) / elapsed;
int oldPrefetch = currentPrefetch;
if (tps < 10) {
// 处理能力不足,降低预取数量
currentPrefetch = Math.max(MIN_PREFETCH, currentPrefetch / 2);
} else if (tps > 50) {
// 处理能力充足,提高预取数量
currentPrefetch = Math.min(MAX_PREFETCH, currentPrefetch + 10);
}
if (currentPrefetch != oldPrefetch) {
channel.basicQos(currentPrefetch);
System.out.println("预取数量调整: " + oldPrefetch + " -> " + currentPrefetch + " (TPS=" + String.format("%.1f", tps) + ")");
}
processStartTime.set(System.currentTimeMillis());
}
private void processMessage(String message) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
basicQos可在Channel级别设置,若需为不同消费者设置不同预取值,应在创建每个消费者前单独调用。basicQos(prefetchSize, prefetchCount, global)中global=false表示仅应用于当前消费者。
注意事项
预取数量过小:消费者频繁等待Broker推送消息,吞吐量下降。适合处理耗时较长的消息场景。
预取数量过大:大量消息堆积在消费者内存中,可能导致OOM或处理延迟放大。
动态调整频率:自适应调整不宜过频,建议3-10秒为一个统计周期,避免抖动。
多消费者场景:同一Channel上多个消费者共享预取配额,建议为每个消费者分配独立Channel。
prefetchSize参数:
basicQos第一个参数prefetchSize限制未确认消息总字节数,通常设为0(不限制),仅用prefetchCount控制数量。
要点总结
basicQos(prefetchCount)控制未ACK前Broker可推送的最大消息数- 固定预取数量适合处理耗时均匀的消息场景
- 自适应动态调整根据实际TPS动态伸缩预取数量
- 预取数量过小降低吞吐,过大增加内存压力
- 多消费者应为每个消费者分配独立Channel以便独立控制
📝 发现内容有误?点击此处直接编辑