消息内容类型
RabbitMQ通过消息属性中的content_type字段标识消息体格式,消费者据此选择正确的解析方式。
定义
content_type是RabbitMQ消息属性(BasicProperties)的一部分,采用MIME类型格式标识消息体的数据格式。常见的值包括application/json、text/plain、application/octet-stream等。
content_type 常用值
| content_type | 说明 | 典型场景 |
|---|---|---|
| application/json | JSON格式 | 跨语言服务间通信 |
| text/plain | 纯文本 | 简单字符串消息 |
| text/xml | XML格式 | 遗留系统对接 |
| application/octet-stream | 二进制流 | 文件传输、Protobuf |
| application/x-java-serialized-object | Java序列化 | Java系统内部通信 |
发送端设置
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
public class ContentTypeProducer {
private static final String QUEUE_NAME = "content_type_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 示例1: JSON消息
String jsonMessage = "{\"orderId\":\"ORD-001\",\"amount\":100}";
BasicProperties jsonProps = new BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("UTF-8")
.build();
channel.basicPublish("", QUEUE_NAME, jsonProps, jsonMessage.getBytes("UTF-8"));
// 示例2: 纯文本消息
String textMessage = "Hello RabbitMQ";
BasicProperties textProps = new BasicProperties.Builder()
.contentType("text/plain")
.contentEncoding("UTF-8")
.build();
channel.basicPublish("", QUEUE_NAME, textProps, textMessage.getBytes("UTF-8"));
// 示例3: 二进制消息
byte[] binaryMessage = new byte[]{0x01, 0x02, 0x03, 0x04};
BasicProperties binaryProps = new BasicProperties.Builder()
.contentType("application/octet-stream")
.build();
channel.basicPublish("", QUEUE_NAME, binaryProps, binaryMessage);
System.out.println("已发送3种不同content_type的消息");
}
}
}
接收端解析
Java
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class ContentTypeConsumer {
private static final String QUEUE_NAME = "content_type_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
BasicProperties props = delivery.getProperties();
byte[] body = delivery.getBody();
String contentType = props.getContentType();
System.out.println("收到消息, content_type: " + contentType);
switch (contentType) {
case "application/json":
handleJsonMessage(body);
break;
case "text/plain":
handleTextMessage(body);
break;
case "application/octet-stream":
handleBinaryMessage(body);
break;
default:
System.err.println("未知消息类型: " + contentType);
// 根据业务决定是否拒绝
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
return;
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
private static void handleJsonMessage(byte[] body) {
String json = new String(body, StandardCharsets.UTF_8);
System.out.println("JSON消息: " + json);
// 使用Jackson/Gson解析
}
private static void handleTextMessage(byte[] body) {
String text = new String(body, StandardCharsets.UTF_8);
System.out.println("文本消息: " + text);
}
private static void handleBinaryMessage(byte[] body) {
System.out.println("二进制消息, 长度: " + body.length + " bytes");
// 按业务协议解析
}
}
content_type 与 content_encoding
Java
// 同时设置 content_type 和 content_encoding
BasicProperties props = new BasicProperties.Builder()
.contentType("application/json") // 数据格式
.contentEncoding("UTF-8") // 字符编码
.build();
content_type:标识数据类型(MIME类型)content_encoding:标识字符编码(仅对文本类数据有效)
注意事项
content_type是可选属性,不设置时默认为null。但强烈建议始终设置,否则消费者无法可靠识别消息格式。
RabbitMQ本身不验证
content_type值的有效性,它仅透传该属性。格式校验由消费者完成。
混合格式消息队列中,必须根据
content_type路由到不同的处理器,避免解析错误。
使用Spring AMQP时,可通过
MessageConverter自动设置和识别content_type。
Spring AMQP 自动识别示例
Java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringAutoTypeConsumer {
@RabbitListener(queues = "content_type_queue")
public void handleMessage(Message message) {
MessageProperties props = message.getMessageProperties();
String contentType = props.getContentType();
System.out.println("Spring自动识别content_type: " + contentType);
if ("application/json".equals(contentType)) {
// Spring会根据contentType自动反序列化
byte[] body = message.getBody();
System.out.println("JSON消息体: " + new String(body));
}
}
}
要点总结
content_type使用MIME类型格式标识消息体数据格式- 常见值:
application/json、text/plain、application/octet-stream - 发送端必须设置
content_type,接收端根据该值选择解析方式 - 可配合
content_encoding指定字符编码 - RabbitMQ不验证
content_type有效性,校验由消费者完成 - 混合格式队列中,必须根据
content_type路由到对应处理器 - Spring AMQP支持基于
content_type的自动消息转换
📝 发现内容有误?点击此处直接编辑