Spring Cloud Stream异常处理过程解析

应用处理

当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常。若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理、系统层面的处理以及通过RetryTemplate进行处理。

本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理。

局部处理

Stream相关的配置内容如下:

spring:
 cloud:
  stream:
   rocketmq:
    binder:
     name-server: 192.168.190.129:9876
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group

所谓局部处理就是针对指定的channel进行处理,需要定义一个处理异常的方法,并在该方法上添加@ServiceActivator注解,该注解有一个inputChannel属性,用于指定对哪个channel进行处理,格式为{destination}.{group}.errors。具体代码如下:

package com.zj.node.usercenter.rocketmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;
/**
 * 消费者
 *
 * @author 01
 * @date 2019-08-10
 **/
@Slf4j
@Service
public class TestStreamConsumer {
  @StreamListener(Sink.INPUT)
  public void receive1(String messageBody) {
    log.info("消费消息,messageBody = {}", messageBody);
    throw new IllegalArgumentException("参数错误");
  }
  /**
   * 处理局部异常的方法
   *
   * @param errorMessage 异常消息对象
   */
  @ServiceActivator(
    // 通过特定的格式指定处理哪个channel的异常
    inputChannel = "stream-test-topic.binder-group.errors"
  )
  public void handleError(ErrorMessage errorMessage) {
    // 获取异常对象
    Throwable errorMessagePayload = errorMessage.getPayload();
    log.error("发生异常", errorMessagePayload);
    // 获取消息体
    Message<?> originalMessage = errorMessage.getOriginalMessage();
    if (originalMessage != null) {
      log.error("消息体: {}", originalMessage.getPayload());
    } else {
      log.error("消息体为空");
    }
  }
}

全局处理

全局处理则是可以处理所有channel抛出来的异常,所有的channel抛出异常后会生成一个ErrorMessage对象,即错误消息。错误消息会被放到一个专门的channel里,这个channel就是errorChannel。所以通过监听errorChannel就可以实现全局异常的处理。具体代码如下:

@StreamListener(Sink.INPUT)
public void receive1(String messageBody) {
  log.info("消费消息,messageBody = {}", messageBody);
  throw new IllegalArgumentException("参数错误");
}
/**
 * 处理全局异常的方法
 *
 * @param errorMessage 异常消息对象
 */
@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
  log.error("发生异常. errorMessage = {}", errorMessage);
}

系统处理

系统处理方式,因消息中间件的不同而异。如果应用层面没有配置错误处理,那么error将会被传播给binder,而binder则会将error回传给消息中间件。消息中间件可以选择:

  • 丢弃消息:错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产
  • requeue(重新排队,从而重新处理)
  • 将失败的消息发送给DLQ(死信队列)

DLQ

目前RabbitMQ对DLQ的支持比较好,这里以RabbitMQ为例,只需要添加DLQ相关的配置:

spring:
 cloud:
  stream:
   bindings:
    input:
     destination: stream-test-topic
     group: binder-group
   rabbit:
    bindings:
     input:
      consumer:
       # 自动将失败的消息发送给DLQ
       auto-bind-dlq: true

消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。

如果想获取原始错误的异常堆栈,可添加如下配置:

spring:
 cloud:
  stream:
   rabbit:
    bindings:
     input:
      consumer:
       republish-to-dlq: true

requeue

Rabbit及Kafka的binder依赖RetryTemplate实现消息重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1 ,那么RetryTemplate则不会再重试。此时可以通过requeue方式来处理异常。

需要添加如下配置:

# 默认是3,设为1则禁用重试
spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
# 表示是否要requeue被拒绝的消息(即:requeue处理失败的消息)
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true

这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException 异常为止。

RetryTemplate

RetryTemplate主要用于实现消息重试,也是错误处理的一种手段。有两种配置方式,一种是通过配置文件进行配置,如下示例:

spring:
 cloud:
  stream:
   bindings:
    <input channel名称>:
     consumer:
      # 最多尝试处理几次,默认3
      maxAttempts: 3
      # 重试时初始避退间隔,单位毫秒,默认1000
      backOffInitialInterval: 1000
      # 重试时最大避退间隔,单位毫秒,默认10000
      backOffMaxInterval: 10000
      # 避退乘数,默认2.0
      backOffMultiplier: 2.0
      # 当listen抛出retryableExceptions未列出的异常时,是否要重试
      defaultRetryable: true
      # 异常是否允许重试的map映射
      retryableExceptions:
       java.lang.RuntimeException: true
       java.lang.IllegalStateException: false

另一种则是通过代码配置,在多数场景下,使用配置文件定制重试行为都是可以满足需求的,但配置文件里支持的配置项可能无法满足一些复杂需求。此时可使用代码方式配置RetryTemplate,如下示例:

@Configuration
class RetryConfiguration {
  @StreamRetryTemplate
  public RetryTemplate sinkConsumerRetryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setRetryPolicy(retryPolicy());
    retryTemplate.setBackOffPolicy(backOffPolicy());
    return retryTemplate;
  }
  private ExceptionClassifierRetryPolicy retryPolicy() {
    BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier(
        Collections.singletonList(IllegalAccessException.class
        ));
    keepRetryingClassifier.setTraverseCauses(true);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
    AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();

    ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy();
    retryPolicy.setExceptionClassifier(
        classifiable -> keepRetryingClassifier.classify(classifiable) ?
            alwaysRetryPolicy : simpleRetryPolicy);
    return retryPolicy;
  }
  private FixedBackOffPolicy backOffPolicy() {
    final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
    backOffPolicy.setBackOffPeriod(2);
    return backOffPolicy;
  }
}

最后还需要添加一段配置:

spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate

注:Spring Cloud Stream 2.2才支持设置retry-template-name

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • springcloud config配置读取优先级过程详解

    情景描述 最近在修复Eureka的静态页面加载不出的缺陷时,最终发现是远程GIT仓库将静态资源访问方式配置给禁用了(spring.resources.add-mappings=false).虽然最后直接修改远程GIT仓库的此配置项给解决了(spring.resources.add-mappings=true),但是从中牵涉出的配置读取优先级我们必须好好的再回顾下 springcloud config读取仓库配置 通过config client模块来读取远程的仓库配置,只需要在boostrap.p

  • Spring Cloud Stream如何实现服务之间的通讯

    Spring Cloud Stream Srping cloud Bus的底层实现就是Spring Cloud Stream,Spring Cloud Stream的目的是用于构建基于消息驱动(或事件驱动)的微服务架构.Spring Cloud Stream本身对Spring Messaging.Spring Integration.Spring Boot Actuator.Spring Boot Externalized Configuration等模块进行封装(整合)和扩展,下面我们实现两个

  • springcloud微服务基于redis集群的单点登录实现解析

    简介 本文介绍微服务架构中如何实现单点登录功能 创建三个服务: 操作redis集群的服务,用于多个服务之间共享数据 统一认证中心服务,用于整个系统的统一登录认证 服务消费者,用于测试单点登录 大体思路:每个服务都设置一个拦截器检查cookie中是否有token,若有token,则放行,若没有token,重定向到统一认证中心服务进行登录,登录成功后返回到被拦截的服务. 搭建redis集群服务 搭建redis集群参考文档 搭建统一认证中心 主函数添加注解 /** * 单点登录既要注册到服务注册中心,

  • 创建网关项目(Spring Cloud Gateway)过程详解

    创建网关项目 加入网关后微服务的架构图 创建项目 POM文件 <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR3</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springfram

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • Spring Boot 开发私有即时通信系统(WebSocket)

    1/ 概述 利用Spring Boot作为基础框架,Spring Security作为安全框架,WebSocket作为通信框架,实现点对点聊天和群聊天. 2/ 所需依赖 Spring Boot 版本 1.5.3,使用MongoDB存储数据(非必须),Maven依赖如下: <properties> <java.version>1.8</java.version> <thymeleaf.version>3.0.0.RELEASE</thymeleaf.ve

  • Springboot实现阿里云通信短信服务有关短信验证码的发送功能

    前言 短信验证码是通过发送验证码到手机的一种有效的验证码系统.主要用于验证用户手机的合法性及敏感操作的身份验证. 现在市面上的短信服务平台有很多.大家在选择的时候未免会有些不好抉择.本人建议选择短信服务商应遵循以下几点: 服务商知名度高,业务流量大.(这样的平台可信度高) 服务稳定,不能经常宕机.(保证自身业务的流畅运行) 文档全面详细.(没文档怎么玩?) 最近的一个项目中,注册和修改密码时需要用到短信验证码校验手机号的功能.本人也是对比几家后,直接选择阿里云通信的短信服务.(本身项目服务器也是

  • Spring Cloud Stream异常处理过程解析

    应用处理 当消费者在处理接收到的消息时,有可能会由于某些原因而抛出异常.若希望对抛出来的异常进行处理的话,就需要采取一些异常处理手段,异常处理的方式可分为三种:应用层面的处理.系统层面的处理以及通过RetryTemplate进行处理. 本小节先来介绍较为常用的应用层面的异常处理方式,该方式又细分为局部处理和全局处理. 局部处理 Stream相关的配置内容如下: spring: cloud: stream: rocketmq: binder: name-server: 192.168.190.12

  • Spring Cloud Stream微服务消息框架原理及实例解析

    随着近些年微服务在国内的盛行,消息驱动被提到的越来越多.主要原因是系统被拆分成多个模块后,一个业务往往需要在多个服务间相互调用,不管是采用HTTP还是RPC都是同步的,不可避免快等慢的情况发生,系统性能上很容易遇到瓶颈.在这样的背景下,将业务中实时性要求不是特别高且非主干的部分放到消息队列中是很好的选择,达到了异步解耦的效果. 目前消息队列有很多优秀的中间件,目前使用较多的主要有 RabbitMQ,Kafka,RocketMQ 等,这些中间件各有优势,有的对 AMQP(应用层标准高级消息队列协议

  • Spring Cloud Stream简单用法

    目录 简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者 生产者 消费者 Stream其他特性 消息发送失败的处理 消费者错误处理 Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样).如此⼀来,我们学习.开发.维护MQ都会变得轻松.⽬前Spring Cloud St

  • Spring Cloud Stream 高级特性使用详解

    目录 重试 消息发送失败的处理 消费错误处理 自定义MessageHandler类型 Endpoint端点 Metrics指标 Serverless Partition统一 Polling Consumer 支持多个Binder同时使用 建立事件机制 重试 Consumer端可以配置重试次数,当消息消费失败的时候会进行重试. 底层使用Spring Retry去重试,重试次数可自定义配置. # 默认重试次数为3,配置大于1时才会生效 spring.cloud.stream.bindings.<ch

  • Spring整合MyBatis图示过程解析

    这篇文章主要介绍了Spring整合MyBatis图示过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.导入所需要的jar依赖 !--MyBatis和Spring的整合包 由MyBatis提供--> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <vers

  • Spring Cloud Feign组件实例解析

    这篇文章主要介绍了Spring Cloud Feign组件实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 采用Spring Cloud微服务框架后,经常会涉及到服务间调用,服务间调用采用了Feign组件. 由于之前有使用dubbo经验.dubbo的负载均衡策略(轮训.最小连接数.随机轮训.加权轮训),dubbo失败策略(快速失败.失败重试等等), 所以Feign负载均衡策略的是什么? 失败后是否会重试,重试策略又是什么?带这个疑问,查了

  • Spring Boot 整合 Druid过程解析

    这篇文章主要介绍了Spring Boot 整合 Druid过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 概述 Druid 是阿里巴巴开源平台上的一个项目,整个项目由数据库连接池.插件框架和 SQL 解析器组成.该项目主要是为了扩展 JDBC 的一些限制,可以让程序员实现一些特殊的需求,比如向密钥服务请求凭证.统计 SQL 信息.SQL 性能收集.SQL 注入检查.SQL 翻译等,程序员可以通过定制来实现自己需要的功能. Druid 是

  • Spring boot整合log4j2过程解析

    这篇文章主要介绍了Spring boot整合log4j2过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 以前整合过log4j2,但是今天再次整合发现都忘记了,而且也没有记下来 1.pom.xml中 (1)把spring-boot-starter-web包下面的spring-boot-starter-logging排除 <dependency> <groupId>org.springframework.boot</gr

随机推荐