全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息队列应用场景

消息队列主要应用于三大核心场景:系统解耦、流量削峰、异步处理。

场景一:系统解耦

问题:订单系统直接调用库存、物流、通知系统,任一子系统故障影响整体流程。

方案:订单系统发送消息到队列,各子系统独立消费,彼此无直接依赖。

Java
订单系统 --> [消息队列] --> 库存系统
                      --> 物流系统
                      --> 通知系统

代码示例:

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class DecoupledProducer {
    private static final String EXCHANGE = "order.exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE, "fanout");

            String message = "{\"orderId\": 1001, \"action\": \"create\"}";
            channel.basicPublish(EXCHANGE, "", null, message.getBytes("UTF-8"));
            System.out.println("订单消息已发送,各子系统自行消费");
        }
    }
}

解耦的核心价值:新增或下线子系统无需修改生产者代码,只需调整队列绑定关系。

场景二:流量削峰

问题:秒杀活动时瞬间请求量超出数据库处理能力,导致系统崩溃。

方案:请求先入消息队列,消费者按数据库承受能力匀速消费。

Java
用户请求 --> [消息队列] --> 消费者(匀速处理) --> 数据库
            (堆积缓冲)      (按自身速率消费)

代码示例:

text
import com.rabbitmq.client.*;
import java.util.concurrent.CountDownLatch;

public class PeakShavingConsumer {
    private static final String QUEUE = "order_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        // 限制并发消费者数量,实现匀速消费
        int consumerCount = 5;
        CountDownLatch latch = new CountDownLatch(consumerCount);

        for (int i = 0; i < consumerCount; i++) {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.basicQos(1); // 每次只处理一条,未确认不推送

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                try {
                    String message = new String(delivery.getBody(), "UTF-8");
                    // 模拟数据库写入
                    Thread.sleep(200);
                    System.out.println("处理订单: " + message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }
            };
            channel.basicConsume(QUEUE, false, deliverCallback, consumerTag -> {});
            latch.countDown();
        }
        latch.await();
    }
}

basicQos(prefetchCount = 1) 确保消费者不被消息淹没,按自身能力处理。

场景三:异步处理

问题:用户注册后需发送邮件、短信、初始化数据等耗时操作,阻塞主流程。

方案:注册完成后发送消息,耗时操作异步执行,快速返回用户。

text
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class AsyncRegisterProducer {
    private static final String QUEUE = "user.register";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE, true, false, false, null);

            String message = "{\"userId\": 1024, \"email\": \"user@example.com\"}";
            channel.basicPublish("", QUEUE, null, message.getBytes("UTF-8"));
            // 主流程立即返回,后续操作由消费者异步处理
            System.out.println("用户注册完成,欢迎使用!");
        }
    }
}

异步处理将串行操作转为并行,主链路耗时从 T1+T2+T3 缩减为 T1。

要点总结

  • 解耦:生产者与消费者无直接依赖,系统扩展性大幅提升
  • 削峰:消息队列作为缓冲层,消费者按自身能力匀速处理
  • 异步处理:耗时操作移出主链路,提升用户响应速度
  • 三大场景共同点:将同步调用转为异步传递,降低系统耦合度与响应延迟

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息队列主流产品
下一篇 → 消息队列概念
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库