全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息属性与编码

RabbitMQ 消息可携带多种属性用于控制投递行为,下面梳理常用属性配置与消息编码方法。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

消息属性配置

BasicProperties 构建

Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class MessagePropertiesExample {
    private static final String EXCHANGE_NAME = "props_exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
            
            // 构建消息属性
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .contentType("application/json")          // 内容类型
                .contentEncoding("UTF-8")                 // 内容编码
                .deliveryMode(2)                          // 持久化模式:1=非持久化, 2=持久化
                .priority(5)                              // 优先级 0-9
                .expiration("60000")                      // 过期时间(毫秒)
                .messageId("msg_001")                     // 消息唯一标识
                .timestamp(new java.util.Date())          // 时间戳
                .build();
            
            String message = "{\"task\":\"send_email\",\"to\":\"user@example.com\"}";
            channel.basicPublish(EXCHANGE_NAME, "props_key", props, 
                message.getBytes(StandardCharsets.UTF_8));
            System.out.println("消息已发送: " + message);
        }
    }
}

常用属性说明

属性类型说明
contentTypeString消息内容类型,如 application/json、text/plain
contentEncodingString内容编码,如 UTF-8
deliveryModeInteger1=非持久化,2=持久化(服务器重启后保留)
priorityInteger优先级 0-9,需队列开启优先级支持
expirationString消息过期时间(毫秒字符串),超时后消息被丢弃
messageIdString消息唯一标识,用于去重
timestampDate消息发送时间戳
headersMap<String, Object>自定义头部参数,用于 headers 交换机匹配

多格式消息编码

JSON 消息

Java
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;

public class JsonMessageExample {
    private static final String QUEUE_NAME = "json_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        ObjectMapper mapper = new ObjectMapper();

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 发送 JSON 消息
            Map<String, Object> data = new HashMap<>();
            data.put("action", "update_user");
            data.put("userId", 1001);
            data.put("name", "张三");
            
            String jsonBody = mapper.writeValueAsString(data);
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                .contentType("application/json")
                .build();
            
            channel.basicPublish("", QUEUE_NAME, props, 
                jsonBody.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送JSON: " + jsonBody);
            
            // 消费并解析 JSON
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                try {
                    String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
                    Map<String, Object> receivedData = mapper.readValue(body, Map.class);
                    System.out.println("解析JSON: " + receivedData);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    System.err.println("JSON解析失败: " + e.getMessage());
                }
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }
}

序列化对象消息

Java
import com.rabbitmq.client.*;
import java.io.*;
import java.nio.charset.StandardCharsets;

public class SerializableMessageExample {
    private static final String QUEUE_NAME = "serial_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 序列化对象
            TaskMessage task = new TaskMessage("process_data", 100);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(task);
            oos.close();
            
            channel.basicPublish("", QUEUE_NAME, null, baos.toByteArray());
            System.out.println("序列化消息已发送");
        }
    }

    static class TaskMessage implements Serializable {
        private String type;
        private int priority;
        
        public TaskMessage(String type, int priority) {
            this.type = type;
            this.priority = priority;
        }
        
        @Override
        public String toString() {
            return "TaskMessage{type='" + type + "', priority=" + priority + "}";
        }
    }
}

注意事项

deliveryMode 设为 2 时消息会持久化到磁盘,但需配合持久化队列和交换机使用才有效。

消息优先级功能需在声明队列时通过参数 x-max-priority 开启,否则优先级设置无效。

expiration 属性值为字符串格式的毫秒数,消息超时后会被自动删除。

消息体最大限制由 RabbitMQ 配置决定(默认无限制),超大消息可能影响性能。

要点总结

  • AMQP.BasicProperties.Builder 用于构建消息属性,支持 contentType、deliveryMode、priority 等配置。
  • deliveryMode 为 2 时消息持久化,为 1 时不持久化。
  • JSON 格式消息需设置 contentType 为 application/json,消费端按相同编码解析。
  • 序列化对象消息需实现 Serializable 接口,通过 ObjectOutputStream 转换。
  • 消息优先级和过期时间需在队列和消息层面配合使用才生效。

📝 发现内容有误?点击此处直接编辑

← 上一篇 队列 Queue
下一篇 → 消费者基础接收
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库