自定义消息消费失败处理
要解决的问题
根据官方文档 https://spring.io/projects/spring-pulsar 的消息消 费失败后,配置消息重试次数,达到最大重试次数后,将消息发送到死信队列,实现消息消费失败的处理。
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
每一个 Topic 都需要定义一个 DeadLetterPolicy Bean,定义一个死信队列,然后在死信队列处理消息,这都是重复性流程。无法实现整个服务的所有消费者配置重试次数,
如果不需要查看死信队列数据的情况下,也不需要区分死信消息和正常消息 ,那么可以借助 AOP 来简化流程。
实现方案
准备
-
添加 AOP 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
</dependencies> -
开启 AOP 支持
在启动类添加注解:@EnableAspectJAutoProxy(exposeProxy = true)
。