Spring Cloud Stream异常处理过程讲解
- 更新时间:2020-03-25 16:23:42
- 编辑:徐米雪
应用处理
当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。
本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。
局部处理
Stream相关的配置内容如下:
spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.129:9876 bindings: input: destination: stream-test-topic group: binder-group
所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:
package com.zj.node.usercenter.rocketmq; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.support.ErrorMessage; import org.springframework.stereotype.Service; /** * 消费者 * * @author 01 * @date 2019-08-10 **/ @Slf4j @Service public class TestStreamConsumer { @StreamListener(Sink.INPUT) public void receive1(String messageBody) { log.info("消费消息,messageBody = {}", messageBody); throw new IllegalArgumentException("参数错误"); } /** * 处理局部异常的方法 * * @param errorMessage 异常消息对象 */ @ServiceActivator( // 通过特定的格式指定处理哪个channel的异常 inputChannel = "stream-test-topic.binder-group.errors" ) public void handleError(ErrorMessage errorMessage) { // 获取异常对象 Throwable errorMessagePayload = errorMessage.getPayload(); log.error("发生异常", errorMessagePayload); // 获取消息体 Message<?> originalMessage = errorMessage.getOriginalMessage(); if (originalMessage != null) { log.error("消息体: {}", originalMessage.getPayload()); } else { log.error("消息体为空"); } } }
全局处理
全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:
@StreamListener(Sink.INPUT) public void receive1(String messageBody) { log.info("消费消息,messageBody = {}", messageBody); throw new IllegalArgumentException("参数错误"); } /** * 处理全局异常的方法 * * @param errorMessage 异常消息对象 */ @StreamListener("errorChannel") public void handleError(ErrorMessage errorMessage) { log.error("发生异常. errorMessage = {}", errorMessage); }
系统处理
系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:
- 丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产
- requeue(重新排队,从而重新处理)
- 将失败的消息发送给DLQ(死信队列)
DLQ
目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:
spring: cloud: stream: bindings: input: destination: stream-test-topic group: binder-group rabbit: bindings: input: consumer: # 自动将失败的消息发送给DLQ auto-bind-dlq: true
消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。
如果想获取原始错误的异常堆栈,可添加如下配置:
spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq: true
requeue
Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。
需要添加如下配置:
# 默认是3,设为1则禁用重试 spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1 # 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息) spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。
RetryTemplate
RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例:
spring: cloud: stream: bindings: <input channel名称>: consumer: # 最多尝试处理几次,默认3 maxAttempts: 3 # 重试时初始避退间隔,单位毫秒,默认1000 backOffInitialInterval: 1000 # 重试时最大避退间隔,单位毫秒,默认10000 backOffMaxInterval: 10000 # 避退乘数,默认2.0 backOffMultiplier: 2.0 # 当listen抛出retryableExceptions未列出的异常时,是否要重试 defaultRetryable: true # 异常是否允许重试的map映射 retryableExceptions: java.lang.RuntimeException: true java.lang.IllegalStateException: false
另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:
@Configuration class RetryConfiguration { @StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy()); return retryTemplate; } private ExceptionClassifierRetryPolicy retryPolicy() { BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3); AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy(); ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy); return retryPolicy; } private FixedBackOffPolicy backOffPolicy() { final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2); return backOffPolicy; } }
最后还需要添加一段配置:
spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate
注:Spring Cloud Stream 2.2才支持设置retry-template-name
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。
相关教程
-
SpringBoot使用统一异常处理实例讲解
这篇文章主要为大家详细介绍了SpringBoot使用统一异常处理,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
发布时间:2019-08-11
-
Spring数据库异常抽理知识点分享
在本篇文章里小编给大家分享了关于源码解析Spring 数据库异常抽理知识点内容,对此有需要的朋友们学习参考下。
发布时间:2019-08-07
-
疯狂Spring Cloud微服务架构实战
《疯狂Spring Cloud微服务架构实战》以Spring Cloud为基础,深入讲解微服务开发的相关框架,包括服务管理框架Eureka、负载均衡框架Ribbon、服务客户端Feign、容错框架Hystrix、消息框架Stream等。
大小:176.9 MBSpring Cloud电子书
-
Spring MVC学习指南(第2版)
Spring MVC学习指南第2版重在讲述如何通过Spring MVC来开发基于Java的Web应用。分别从Spring框架、模型2和MVC模式、Spring MVC介绍、控制器、数据绑定和表单标签库、传唤器和格式化、验证器、表达式语言、JSTL、国际化、上传文件、下载文件多个角度介绍了Spring MVC
大小:83.3 MBSpring MVC电子书
-
深入浅出Spring Boot 2.x
spring boot 2实战精髓 企业级应用开发实战 微服务实战指南 结合主流持久层框架MyBatis 讲述企业级Spring Boot开发要点 赠送作者讲解的部分配套视频课程
大小:257.67 MBSpring Boot电子书
-
Spring Boot 企业级应用开发实战
《Spring Boot 企业级应用开发实战》 围绕如何整合以 Spring Boot 为核心的技术栈,来实现一个完整的企业级博客系统 NewStarBlog 而展开。该博客系统支持市面上博客系统的常见功能。读者朋友可以
大小:419 MBSpring Boot电子书
-
一步一步学Spring Boot 2
本书主要内容包括Spring Boot环境搭建、Spring Boot常用标签、Spring Boot集成Redis、数据库MySQL、Spring Data、日志Log4J、Thymeleaf模板引擎、ActiveMQ消息、MyBatis等流行技术,以及利用Spring Boot实现邮件发送、Quartz定时器、过滤器Filter和监听器Listener等。
大小:72.04 MBSpring Boot电子书
-
Spring5高级编程
获取Spring 5经验的必备图书!全面的Spring 参考和实用指南,Spring 5的新特性和方法,精通数据访问和事务处理,创建微服务和其他Web服务和与Java 9的互操作性。
大小:356.7 MBSpring5电子书
-
Spring MVC+MyBatis开发从入门到项目实战
工作经验多:千万浏览量时尚博主倾囊相授,陪你走入Web开发设计的各个方面。 初学者:从简易的示例学起,慢慢深层次技术性关键。 內容全:包含SpringMVC与MyBatis的33个方法,56个案例。 重实
大小:230.8 MBSpring MVC开发电子书
-
Spring Cloud微服务实战
《Spring Cloud微服务实战》 从时下流行的微服务架构概念出发,详细介绍了Spring Cloud针对微服务架构中几大核心要素的解决方案和基础组件。对于各个组件的介绍,《Spring Cloud微服务实战》主要
大小:175.5 MB微服务电子书
-
Spring Cloud微服务:全栈技术与案例解析
本书的读者对象主要是Java开发人员:特别是工作1到3年这种工作经验的开发人员,这个阶段的开发人员技术能力一般,正需要一些实用的技术和经验来提升自己,Spring Cloud正是
大小:189.7 MBSpring Cloud电子书
-
Spring MVC学习指南
Spring MVC是Spring框架中用于Web应用快速开发的一个模块,其中的MVC是Model-View-Controller的缩写。作为当今业界zui主流的Web开发框架,Spring MVC已经成为当前zui热门的开发技能,同时也广泛用于桌面开
大小:83.3 MBSpring MVC电子书