并发消费者模型
并发消费者模型通过多线程方式并行处理消息,显著提升消费吞吐量,适用于高吞吐业务场景。
核心思路
RabbitMQ Java Client 默认单线程消费,每个 Channel 绑定一个消费者。实现并发消费的方式:
- 多 Channel + 多消费者:每个线程创建独立 Channel 和消费者
- 线程池 + 单 Channel:单 Channel 接收消息,线程池异步处理
- 多 Channel + 线程池:多个 Channel 各自绑定消费者,配合线程池
推荐方案一:多 Channel + 多消费者,每个消费者独立 Channel,天然支持并发,且隔离性好。
Java 示例:多 Channel 并发消费
Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
public class ConcurrentConsumerExample {
private static final String QUEUE = "concurrent.queue";
private static final int CONSUMER_THREAD_COUNT = 5;
private static final int PREFETCH_COUNT = 10;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
// 创建固定线程池管理消费者线程
ExecutorService executor = Executors.newFixedThreadPool(CONSUMER_THREAD_COUNT);
AtomicLong messageCounter = new AtomicLong(0);
// 启动多个消费者线程
for (int i = 0; i < CONSUMER_THREAD_COUNT; i++) {
final int threadId = i;
executor.submit(() -> {
try {
// 每个线程创建独立 Connection 和 Channel
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
// 设置预取数量
channel.basicQos(PREFETCH_COUNT);
// 注册消费者
channel.basicConsume(QUEUE, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
long count = messageCounter.incrementAndGet();
String message = new String(body, StandardCharsets.UTF_8);
System.out.printf("[线程-%d] 处理消息 #%d: %s%n",
threadId, count, message);
// 模拟业务处理
processMessage(message);
// 手动确认
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.out.println("[线程-" + threadId + "] 消费者已启动");
// 保持线程运行
Thread.currentThread().join();
} catch (IOException | TimeoutException | InterruptedException e) {
System.err.println("[线程-" + threadId + "] 消费者异常: " + e.getMessage());
}
});
}
System.out.println("并发消费者已启动,线程数: " + CONSUMER_THREAD_COUNT);
}
private static void processMessage(String message) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
并发度配置建议
| 场景 | 线程数 | prefetch count |
|---|---|---|
| 低吞吐(<100 msg/s) | 1-2 | 1-5 |
| 中吞吐(100-1000 msg/s) | 3-5 | 10-20 |
| 高吞吐(>1000 msg/s) | 5-10 | 20-50 |
注意事项
- 每个消费者应使用独立 Channel,Channel 不是线程安全的
- 线程数不应超过队列分区数,否则竞争同一队列反而降低性能
- 并发消费者需要配合
basicQos使用,避免单线程积压消息 - Connection 可以复用,但 Channel 必须线程隔离
- 优雅关闭时需要等待所有线程处理完当前消息后再关闭 Connection
线程数不是越多越好,建议通过压测找到最优值。过多线程会导致上下文切换开销大于收益。
要点总结
- 推荐多 Channel + 多消费者方案,每个线程独立 Channel
- Channel 不是线程安全的,必须线程隔离
- 配合
basicQos预取数量使用,实现负载均衡 - 线程数建议 3-10 个,根据吞吐量压测调优
- Connection 可复用,Channel 必须独立创建
📝 发现内容有误?点击此处直接编辑