全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息持久化配置

消息持久化是保障 RabbitMQ 在服务重启或崩溃时不丢失数据的核心机制。

定义

持久化包含两个层面:队列持久化(声明 durable 队列)与消息持久化(设置消息 deliveryMode 为 2)。两者缺一不可,仅持久化队列不持久化消息,消息仍会丢失。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

配置示例

声明持久化队列

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.BuiltinExchangeExchangeType;

public class DurableQueueProducer {
    private static final String QUEUE_NAME = "persistent_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // durable=true 声明持久化队列
            boolean durable = true;
            boolean exclusive = false;
            boolean autoDelete = false;
            channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete, null);

            System.out.println("持久化队列已声明: " + QUEUE_NAME);
        }
    }
}

发送持久化消息

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class DurableMessageProducer {
    private static final String QUEUE_NAME = "persistent_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 先声明持久化队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            String message = "This is a persistent message";

            // MessageProperties.PERSISTENT_TEXT_PLAIN 设置 deliveryMode=2
            channel.basicPublish("", QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));

            System.out.println("持久化消息已发送: " + message);
        }
    }
}

自定义消息属性持久化

Java
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class CustomPersistentMessage {
    private static final String QUEUE_NAME = "persistent_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            String message = "Custom persistent message";

            // 手动构建持久化属性
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)  // 2=持久化, 1=非持久化
                    .contentType("text/plain")
                    .contentEncoding("UTF-8")
                    .build();

            channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));

            System.out.println("自定义持久化消息已发送");
        }
    }
}

注意事项

仅声明 durable 队列不足以实现持久化,必须同时设置消息的 deliveryMode 为 2。

持久化会将数据写入磁盘,对性能有一定影响。高吞吐场景需权衡持久化与性能的关系。

RabbitMQ 不会等待持久化消息真正落盘后再返回 publish 确认,如需强保证,需配合 publisher confirms 机制。

非持久化消息在队列重启或服务重启后会丢失,但性能更高。

要点总结

  • 队列持久化:channel.queueDeclare(queue, true, false, false, null) 中第二个参数设为 true
  • 消息持久化:使用 MessageProperties.PERSISTENT_TEXT_PLAIN 或手动设置 deliveryMode=2
  • 两者必须同时配置,缺一不可
  • 持久化会降低吞吐,需根据业务场景权衡使用

📝 发现内容有误?点击此处直接编辑

← 上一篇 消息拒绝与重新入队
下一篇 → 自动确认模式
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库