消息队列概念
消息队列是一种在分布式系统中用于组件间异步通信的中间件技术。
定义
消息队列(Message Queue,简称 MQ)是消息的传输通道,用于在生产者(发送消息的组件)和消费者(接收消息的组件)之间传递数据。
核心特征:
- 异步通信:生产者无需等待消费者处理即可返回
- 解耦合:生产者和消费者不直接交互,彼此独立
- 缓冲削峰:消息暂存在队列中,消费者按自身处理能力消费
核心模型
XML
生产者 --> [消息队列] --> 消费者
│ │
└─ 发送消息后直接返回 └─ 按需从队列拉取消息处理
数据流转过程:
- 生产者将业务数据封装为消息,发送到消息队列
- 消息队列将消息持久化存储
- 消费者从消息队列中拉取消息进行处理
- 处理完成后返回确认信号,消息被移除
Java 示例
使用 RabbitMQ Java Client 建立基础连接并发送消息:
Maven 依赖:
Java
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
生产者代码:
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("消息已发送: " + message);
}
}
}
消费者代码:
text
import com.rabbitmq.client.*;
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("收到消息: " + message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
// 保持消费者运行
Thread.currentThread().join();
}
}
}
注意事项
消息队列本身不执行业务逻辑,仅负责消息传递;若消息丢失,需依赖持久化机制或重试策略。
队列是先进先出(FIFO)的数据结构,但某些 MQ 产品支持优先级或延迟消费。
消费者必须处理幂等性问题,因为消息可能被重复投递。
要点总结
- 消息队列是生产者与消费者之间的异步通信通道
- 核心能力:解耦、异步处理、缓冲削峰
- 数据流转:生产者发送 -> 队列存储 -> 消费者拉取处理 -> 确认移除
- 消费者需自行实现幂等性以保证消息重复投递时的数据一致性
📝 发现内容有误?点击此处直接编辑