当前位置:首页 > 编程教程 > java技术文章 > Spring Cloud Stream异常处理过程解析

Spring Cloud Stream异常处理过程讲解

  • 发布时间:
  • 作者:码农之家
  • 点击:127

这篇文章主要知识点是关于spring、cloud、stream、异常处理、的内容,如果大家想对相关知识点有系统深入的学习,可以参阅以下电子书

精通Spring MVC4
精通Spring MVC4原书中文版
  • 类型:Spring大小:11.3 MB格式:PDF作者:Geoffroy,Warin
立即下载

Spring Cloud Stream异常处理过程解析

应用处理

当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。

本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。

局部处理

Stream相关的配置内容如下:

spring:
 cloud:
  stream:
   rocketmq:
    binder:
     name-server: 192.168.190.129:9876
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group

所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:

package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
/**
 * 消费者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {
  @StreamListener(Sink.INPUT)
  public void receive1(String messageBody) {
    log.info("消费消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("参数错误");
  }
  /**
   * 处理局部异常的方法
   *
   * @param errorMessage 异常消息对象
   */
  @ServiceActivator(
    // 通过特定的格式指定处理哪个channel的异常
    inputChannel = "stream-test-topic.binder-group.errors"
  )
  public void handleError(ErrorMessage errorMessage) {
    // 获取异常对象
    Throwable errorMessagePayload = errorMessage.getPayload();
    log.error("发生异常", errorMessagePayload);
    // 获取消息体
    Message<?> originalMessage = errorMessage.getOriginalMessage();
    if (originalMessage != null) {
      log.error("消息体: {}", originalMessage.getPayload());
    } else {
      log.error("消息体为空");
    }
  }
}

全局处理

全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
  log.info("消费消息,messageBody = {}", messageBody);
  throw new IllegalArgumentException("参数错误");
}
/**
 * 处理全局异常的方法
 *
 * @param errorMessage 异常消息对象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
  log.error("发生异常. errorMessage = {}", errorMessage);
}

系统处理

系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:

  • 丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产
  • requeue(重新排队,从而重新处理)
  • 将失败的消息发送给DLQ(死信队列)

DLQ

目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:

spring:
 cloud:
  stream:
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group
   rabbit:
    bindings:
     input:
      consumer:
       # 自动将失败的消息发送给DLQ
       auto-bind-dlq: true

消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:
 cloud:
  stream:
   rabbit:
    bindings:
     input:
      consumer:
       republish-to-dlq: true

requeue

Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。

需要添加如下配置:

# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

RetryTemplate

RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例:

spring:
 cloud:
  stream:
   bindings:
    <input channel名称>:
     consumer:
      # 最多尝试处理几次,默认3
      maxAttempts: 3
      # 重试时初始避退间隔,单位毫秒,默认1000
      backOffInitialInterval: 1000
      # 重试时最大避退间隔,单位毫秒,默认10000
      backOffMaxInterval: 10000
      # 避退乘数,默认2.0
      backOffMultiplier: 2.0
      # 当listen抛出retryableExceptions未列出的异常时,是否要重试
      defaultRetryable: true
      # 异常是否允许重试的map映射
      retryableExceptions:
       java.lang.RuntimeException: true
       java.lang.IllegalStateException: false

另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:

@Configuration
class RetryConfiguration {
  @StreamRetryTemplate
  public RetryTemplate sinkConsumerRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(retryPolicy());
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
  }
  private ExceptionClassifierRetryPolicy retryPolicy() {
    BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
        Collections.singletonList(IllegalAccessException.class
        ));
    keepRetryingClassifier.setTraverseCauses(true);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
    AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

    ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
    retryPolicy.setExceptionClassifier(
        classifiable -> keepRetryingClassifier.classify(classifiable) ?
            alwaysRetryPolicy : simpleRetryPolicy);
    return retryPolicy;
  }
  private FixedBackOffPolicy backOffPolicy() {
    final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(2);
    return backOffPolicy;
  }
}

最后还需要添加一段配置:

spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持设置retry-template-name

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

以上就是本次给大家分享的关于java的全部知识点内容总结,大家还可以在下方相关文章里找到相关文章进一步学习,感谢大家的阅读和支持。

您可能感兴趣的文章:

  • SpringBoot使用统一异常处理实例讲解
  • Spring数据库异常抽理知识点分享
  • springcloud 相关电子书
    学习笔记
    网友NO.227051

    Springboot之自定义全局异常处理的实现

    前言: 在实际的应用开发中,很多时候往往因为一些不可控的因素导致程序出现一些错误,这个时候就要及时把异常信息反馈给客户端,便于客户端能够及时地进行处理,而针对代码导致的异常,我们一般有两种处理方式,一种是throws直接抛出,一种是使用try..catch捕获,一般的话,如果逻辑的异常,需要知道异常信息,我们往往选择将异常抛出,如果只是要保证程序在出错的情况下 依然可以继续运行,则使用try..catch来捕获。 但是try..catch会导致代码量的增加,让后期我们的代码变得臃肿且难以维护。当然,springboot作为一个如此优秀的框架,肯定不会坐视不管的,通过springboot自带的注解,我们可以方便的自定义我们的全局异常处理器,并且以json格式返回给我们的客户端。 代码实战: 捕获全局异常: 首先呢,我们新建我们负责全局异常捕捉处理的类……

    网友NO.301113

    spring boot请求异常处理并返回对应的html页面

    通过之前的学习,我知道中间件可以预处理http请求并返回相应页面(比如出现404异常,可以返回一个自己编写的异常界面,而非默认使用的白板404页面,很难看)。其实spring boot也提供了这样的功能。 404异常处理: @Controllerpublic class ErrorHandler404 implements ErrorController { private static final String ERROR_PATH = "/error"; @RequestMapping(value=ERROR_PATH) public String handleError(){ return "error/error-404"; } @Override public String getErrorPath() { return ERROR_PATH; }} 500异常处理: @Componentpublic class ErrorHandler500 implements HandlerExceptionResolver{ @Override public ModelAndView resolveException(HttpServletRequest req, HttpServletResponse resp, Object handler, Exception ex) { // 异常处理逻辑 goes here return new ModelAndView("error/error-500"); }} 以上代码中的文件路径是基于spring boot框架的文件目录结构的。 这样,两种最常见的异常个性化处理就完成……

    网友NO.406794

    springboot结合全局异常处理实现登录注册验证

    在学校做一个校企合作项目,注册登录这一块需要对注册登录进行输入合法的服务器端验证,因为是前后端分离开发,所以要求返回JSON数据。 方法有很多,这觉得用全局异常处理比较容易上手 全局异常处理 首先来创建一个sprIngboot的web项目或模块,目录结构如下 实体类User.java @Datapublic class User { private String userName; private String passwold;} 实体类UserResult.java 把数据封装到这里返回到客户端 @Data@NoArgsConstructor@AllArgsConstructorpublic class UserResult { private int code; private String msg;} 接下来自定义异常,都继承自Exception UserNullException.java 当用户名为空抛出这个异常 public class UserNullException extends Exception{ public UserNullException() { super("用户名不能为空"); }} PasswoldNullException.java 当密码为空抛出这个异常 public class PasswoldNullException extends Exception { public PasswoldNullException() { super("密码……

    <
    1
    >

    Copyright 2018-2020 www.xz577.com 码农之家

    版权投诉 / 书籍推广 / 赞助:520161757@qq.com