全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

RabbitMQ 架构概述

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol)协议的企业级消息中间件,其核心架构由多个组件协同完成消息路由与投递。

整体架构

XML
生产者
  │
  ├─ 发送消息到 ──┐
                  ▼
              ┌────────┐     路由规则     ┌──────────┐
              │Exchange│ ─────────────> │  Queue   │
              │ (交换机)│                │  (队列)   │
              └────────┘                └──────────┘
                                            │
                                            ▼
                                      ┌──────────┐
                                      │ Consumer  │
                                      │ (消费者)   │
                                      └──────────┘

核心组件

组件说明
BrokerRabbitMQ 服务实例,接收、存储、转发消息
Exchange接收生产者消息,根据路由规则转发到对应队列
Queue消息的存储容器,消费者从中拉取消息
BindingExchange 与 Queue 之间的绑定关系,定义路由规则
RoutingKey生产者发送消息时指定的路由标识
Connection客户端与 Broker 之间的 TCP 连接
Channel连接内的轻量级虚拟通道,复用 TCP 连接

AMQP 协议

RabbitMQ 实现 AMQP 0-9-1 协议,核心概念:

  • 消息路由:生产者不直接发送消息到队列,而是发送到 Exchange
  • Binding 规则:Exchange 根据 Binding 和 RoutingKey 决定消息投递到哪个队列
  • Channel 复用:一个 TCP 连接可创建多个 Channel,减少连接开销

Exchange 类型

类型路由规则
DirectRoutingKey 完全匹配
Fanout广播到所有绑定队列
TopicRoutingKey 模式匹配(* 和 # 通配符)
Headers根据消息头属性匹配

Java 示例:Direct Exchange

Maven 依赖:

Java
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

生产者:

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DirectExchangeProducer {
    private static final String EXCHANGE = "direct_exchange";
    private static final String ROUTING_KEY = "order.create";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明 Direct Exchange
            channel.exchangeDeclare(EXCHANGE, "direct");
            // 声明队列
            channel.queueDeclare("order.queue", true, false, false, null);
            // 绑定 Exchange 与 Queue
            channel.queueBind("order.queue", EXCHANGE, ROUTING_KEY);

            String message = "New order created";
            channel.basicPublish(EXCHANGE, ROUTING_KEY, null, message.getBytes("UTF-8"));
            System.out.println("消息已发送到 Exchange: " + EXCHANGE +
                    ", RoutingKey: " + ROUTING_KEY);
        }
    }
}

消费者:

text
import com.rabbitmq.client.*;

public class DirectExchangeConsumer {
    private static final String EXCHANGE = "direct_exchange";
    private static final String QUEUE = "order.queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE, "direct");
            channel.queueDeclare(QUEUE, true, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, "order.create");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("收到消息: " + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {});
            System.out.println("消费者已启动...");
        }
    }
}

注意事项

生产者不直接指定队列,而是发送消息到 Exchange,由 Exchange 根据 Binding 规则路由到对应队列。

Channel 是线程安全的,但应在同一线程内复用,避免跨线程操作。

连接和 Channel 使用完毕后必须关闭,否则会造成资源泄漏。

要点总结

  • RabbitMQ 核心架构:生产者 -> Exchange -> Binding -> Queue -> 消费者
  • Exchange 负责消息路由,支持 Direct、Fanout、Topic、Headers 四种类型
  • Binding 定义 Exchange 到 Queue 的路由规则,RoutingKey 是匹配标识
  • Connection 是 TCP 连接,Channel 是轻量级虚拟通道,复用连接资源
  • AMQP 协议确保消息路由的标准化与跨平台兼容性

📝 发现内容有误?点击此处直接编辑

← 上一篇 Erlang 依赖说明
下一篇 → 服务启动与状态检查
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库