全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

消息过滤与拦截

拦截器(Interceptor)在消息发布和消费前后执行自定义逻辑,无需修改业务代码。

拦截器接口

RabbitMQ 提供 PublisherCallbackChannel.Listener 和自定义拦截器接口。

Java
// 发布拦截器
public interface PublishInterceptor {
    // 消息发布前
    void beforePublish(Channel channel, String exchange, String routingKey, 
                       BasicProperties props, byte[] body);
    
    // 消息发布后
    void afterPublish(Channel channel, String exchange, String routingKey,
                      BasicProperties props, byte[] body);
}

// 消费拦截器
public interface ConsumeInterceptor {
    // 消息消费前
    void beforeConsume(Envelope envelope, BasicProperties props, byte[] body);
    
    // 消息消费后
    void afterConsume(Envelope envelope, BasicProperties props, byte[] body);
}

RabbitMQ Java Client 未内置拦截器框架,需自行实现或使用 Spring AMQP 等上层封装。

实现发布拦截器

通过包装 basicPublish 实现发布拦截。

Java
public class InterceptedChannel {
    private final Channel delegate;
    private final List<PublishInterceptor> interceptors = new ArrayList<>();
    
    public InterceptedChannel(Channel delegate) {
        this.delegate = delegate;
    }
    
    public void addInterceptor(PublishInterceptor interceptor) {
        interceptors.add(interceptor);
    }
    
    public void basicPublish(String exchange, String routingKey, 
                             BasicProperties props, byte[] body) throws Exception {
        // 发布前拦截
        for (PublishInterceptor interceptor : interceptors) {
            interceptor.beforePublish(delegate, exchange, routingKey, props, body);
        }
        
        // 执行实际发布
        delegate.basicPublish(exchange, routingKey, props, body);
        
        // 发布后拦截
        for (PublishInterceptor interceptor : interceptors) {
            interceptor.afterPublish(delegate, exchange, routingKey, props, body);
        }
    }
}

消息过滤示例

在拦截器中实现消息过滤,丢弃不符合规则的消息。

Java
// 过滤拦截器
PublishInterceptor filterInterceptor = new PublishInterceptor() {
    @Override
    public void beforePublish(Channel channel, String exchange, String routingKey,
                              BasicProperties props, byte[] body) {
        // 过滤空消息
        if (body == null || body.length == 0) {
            System.out.println("丢弃空消息");
            throw new RuntimeException("空消息被拦截");
        }
        
        // 过滤超过大小限制的消息(1MB)
        if (body.length > 1024 * 1024) {
            System.out.println("丢弃超大消息");
            throw new RuntimeException("消息超过大小限制");
        }
    }
    
    @Override
    public void afterPublish(Channel channel, String exchange, String routingKey,
                             BasicProperties props, byte[] body) {
        // 记录发布日志
        System.out.println("消息已发布: " + routingKey + ", 大小: " + body.length + "B");
    }
};

// 使用
InterceptedChannel interceptedChannel = new InterceptedChannel(channel);
interceptedChannel.addInterceptor(filterInterceptor);
interceptedChannel.basicPublish("my_exchange", "test_key", null, "Hello".getBytes());

消息格式转换

在拦截器中实现消息格式转换。

Java
PublishInterceptor transformInterceptor = new PublishInterceptor() {
    @Override
    public void beforePublish(Channel channel, String exchange, String routingKey,
                              BasicProperties props, byte[] body) {
        try {
            // JSON 转 Protobuf(示例)
            String json = new String(body, StandardCharsets.UTF_8);
            // 假设转换为 Protobuf 格式
            byte[] protobuf = json.getBytes(StandardCharsets.UTF_8); // 简化示例
            
            // 修改消息头标记格式
            Map<String, Object> headers = new HashMap<>();
            if (props.getHeaders() != null) {
                headers.putAll(props.getHeaders());
            }
            headers.put("content-format", "protobuf");
            
            props = new AMQP.BasicProperties.Builder()
                .from(props)
                .headers(headers)
                .build();
        } catch (Exception e) {
            System.err.println("格式转换失败: " + e.getMessage());
        }
    }
    
    @Override
    public void afterPublish(Channel channel, String exchange, String routingKey,
                             BasicProperties props, byte[] body) {
        // 无需处理
    }
};

路由预处理

根据消息内容动态修改路由键。

Java
PublishInterceptor routingInterceptor = new PublishInterceptor() {
    @Override
    public void beforePublish(Channel channel, String exchange, String routingKey,
                              BasicProperties props, byte[] body) {
        try {
            // 根据消息内容中的 order_id 路由到不同队列
            String content = new String(body, StandardCharsets.UTF_8);
            if (content.contains("\"priority\":\"high\"")) {
                routingKey = "high_priority";
            } else if (content.contains("\"priority\":\"low\"")) {
                routingKey = "low_priority";
            }
        } catch (Exception e) {
            // 使用默认路由键
        }
    }
    
    @Override
    public void afterPublish(Channel channel, String exchange, String routingKey,
                             BasicProperties props, byte[] body) {
        // 无需处理
    }
};

拦截器中修改路由键需确保目标交换机有对应绑定,否则消息丢失。

完整拦截器示例

Java
public class InterceptorDemo {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.exchangeDeclare("intercepted_exchange", "topic", true);
            channel.queueDeclare("intercepted_queue", true, false, false, null);
            channel.queueBind("intercepted_queue", "intercepted_exchange", "#");
            
            // 包装 Channel
            InterceptedChannel ic = new InterceptedChannel(channel);
            ic.addInterceptor(new PublishInterceptor() {
                @Override
                public void beforePublish(Channel ch, String ex, String rk,
                                         BasicProperties props, byte[] body) {
                    if (body == null || body.length == 0) {
                        throw new RuntimeException("空消息被拦截");
                    }
                    System.out.println("发布前: " + rk);
                }
                
                @Override
                public void afterPublish(Channel ch, String ex, String rk,
                                        BasicProperties props, byte[] body) {
                    System.out.println("发布后: " + body.length + "B");
                }
            });
            
            // 发送消息
            ic.basicPublish("intercepted_exchange", "test.key", null, 
                "拦截器测试消息".getBytes());
            
            System.out.println("消息发送完成");
        }
    }
}

注意事项

  1. RabbitMQ Java Client 未内置拦截器框架,需自行包装 Channel 实现
  2. 拦截器中的异常会中断发布流程,需妥善处理
  3. 拦截器不应执行阻塞操作,否则会影响消息吞吐
  4. Spring AMQP 提供完善的拦截器支持,企业项目建议使用上层框架

要点总结

  • 拦截器在消息发布和消费前后执行自定义逻辑
  • 通过包装 ChannelbasicPublish 实现发布拦截
  • 可在拦截器中实现消息过滤、格式转换和路由预处理
  • 拦截器中的异常会中断发布流程,需妥善处理
  • 原生 Java Client 需自行实现拦截器,Spring AMQP 提供完善支持

📝 发现内容有误?点击此处直接编辑

← 上一篇 插件安装与管理
下一篇 → 消息追踪插件
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库