Protobuf 序列化
Protocol Buffers(Protobuf)是Google推出的二进制序列化方案,在RabbitMQ中可显著降低消息体积并提升编解码性能。
定义
Protobuf通过.proto文件定义消息结构,编译器生成对应语言的序列化代码。相比JSON/XML,Protobuf具备体积更小、编解码更快、强类型约束的特点,适合高性能消息传输。
Maven 依赖
XML
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.24.3</version>
</dependency>
定义 .proto 文件
创建 order.proto:
protobuf
syntax = "proto3";
package rabbitmq.example;
message OrderMessage {
string order_id = 1;
string product_name = 2;
int32 quantity = 3;
double price = 4;
string buyer_email = 5;
}
编译生成 Java 类
Bash
protoc --java_out=src/main/java src/main/proto/order.proto
生成 OrderMessage 类后,即可在代码中使用。
发送端示例
Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.example.OrderProto.OrderMessage;
public class ProtobufProducer {
private static final String QUEUE_NAME = "protobuf_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 构建Protobuf消息
OrderMessage order = OrderMessage.newBuilder()
.setOrderId("ORD-001")
.setProductName("iPhone 15")
.setQuantity(1)
.setPrice(7999.0)
.setBuyerEmail("user@example.com")
.build();
// 序列化为字节数组
byte[] body = order.toByteArray();
// 设置content_type
BasicProperties props = new BasicProperties.Builder()
.contentType("application/x-protobuf")
.build();
channel.basicPublish("", QUEUE_NAME, props, body);
System.out.println("已发送Protobuf消息, 大小: " + body.length + " bytes");
}
}
}
接收端示例
Java
import com.rabbitmq.client.*;
import com.rabbitmq.example.OrderProto.OrderMessage;
import java.io.IOException;
public class ProtobufConsumer {
private static final String QUEUE_NAME = "protobuf_queue";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
String contentType = delivery.getProperties().getContentType();
try {
// 验证消息类型
if (!"application/x-protobuf".equals(contentType)) {
System.err.println("消息类型不匹配: " + contentType);
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
return;
}
// 反序列化Protobuf消息
OrderMessage order = OrderMessage.parseFrom(body);
System.out.println("收到订单: " + order.getOrderId() +
", 商品: " + order.getProductName() +
", 数量: " + order.getQuantity());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println("Protobuf解析失败: " + e.getMessage());
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
}
}
}
JSON vs Protobuf 对比
Java
public class SerializationComparison {
public static void main(String[] args) throws Exception {
OrderJson orderJson = new OrderJson("ORD-001", "iPhone 15", 1, 7999.0);
OrderMessage orderProto = OrderMessage.newBuilder()
.setOrderId("ORD-001")
.setProductName("iPhone 15")
.setQuantity(1)
.setPrice(7999.0)
.build();
// JSON序列化
ObjectMapper mapper = new ObjectMapper();
byte[] jsonBytes = mapper.writeValueAsBytes(orderJson);
System.out.println("JSON大小: " + jsonBytes.length + " bytes");
// Protobuf序列化
byte[] protoBytes = orderProto.toByteArray();
System.out.println("Protobuf大小: " + protoBytes.length + " bytes");
}
}
// 输出示例:
// JSON大小: ~85 bytes
// Protobuf大小: ~45 bytes
注意事项
Protobuf消息不具备自描述性,接收方必须持有对应的
.proto定义(或生成的代码),否则无法解析。
字段编号(field number)一旦使用不应修改,删除字段可保留编号但标记为
reserved,避免后续复用。
application/x-protobuf是Protobuf的MIME类型,需在消息属性中设置。
Protobuf不支持Map的复杂嵌套作为key,建议仅使用基本类型或消息类型作为Map的value。
Protobuf 版本兼容
protobuf
syntax = "proto3";
message OrderMessage {
string order_id = 1;
string product_name = 2;
int32 quantity = 3;
double price = 4;
// 新增字段:向后兼容
string buyer_email = 5;
// 删除的字段标记为reserved,防止编号复用
reserved 6;
reserved "old_discount_field";
}
Protobuf 3中新增字段向后兼容(旧解析器忽略未知字段),但删除字段需用
reserved声明。
要点总结
- Protobuf是二进制序列化方案,体积更小、编解码更快
- 通过
.proto文件定义消息结构,编译器生成对应语言代码 - 发送端调用
toByteArray()序列化,接收端调用parseFrom()反序列化 - 必须设置
contentType = "application/x-protobuf" - 接收方必须持有
.proto定义,Protobuf消息不具备自描述性 - 字段编号不可修改,删除字段需用
reserved标记 - 适合对性能和带宽敏感的高吞吐量场景
📝 发现内容有误?点击此处直接编辑