Spring Cloud 请求重试机制核心代码分析

场景

发布微服务的操作一般都是打完新代码的包,kill掉在跑的应用,替换新的包,启动。

spring cloud 中使用eureka为注册中心,它是允许服务列表数据的延迟性的,就是说即使应用已经不在服务列表了,客户端在一段时间内依然会请求这个地址。那么就会出现请求正在发布的地址,而导致失败。

我们会优化服务列表的刷新时间,以提高服务列表信息的时效性。但是无论怎样,都无法避免有那么一段时间是数据不一致的。

所以我们想到一个办法就是重试机制,当a机子在重启时,同个集群的b是可以正常提供服务的,如果有重试机制就可以在上面这个场景里进行重试到b而不影响正确响应。

操作

需要进行如下的操作:

ribbon:
 ReadTimeout: 10000
 ConnectTimeout: 10000
 MaxAutoRetries: 0
 MaxAutoRetriesNextServer: 1
 OkToRetryOnAllOperations: false

引入spring-retry包

<dependency>
  <groupId>org.springframework.retry</groupId>
  <artifactId>spring-retry</artifactId>
 </dependency>

以zuul为例子还需要配置开启重试:

zuul.retryable=true

遇到了问题

然而万事总没那么一帆风顺,通过测试重试机制生效了,但是并没有我想象的去请求另一台健康的机子,于是被迫去吧开源码看一看,最终发现是源码的bug,不过已经修复,升级版本即可。

代码分析

使用的版本是

spring-cloud-netflix-core:1.3.6.RELEASE

spring-retry:1.2.1.RELEASE

spring cloud 依赖版本:

<dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

因为启用了重试,所以请求应用时会执行RetryableRibbonLoadBalancingHttpClient.execute方法:

public RibbonApacheHttpResponse execute(final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception {
    final RequestConfig.Builder builder = RequestConfig.custom();
    IClientConfig config = configOverride != null ? configOverride : this.config;
    builder.setConnectTimeout(config.get(
        CommonClientConfigKey.ConnectTimeout, this.connectTimeout));
    builder.setSocketTimeout(config.get(
        CommonClientConfigKey.ReadTimeout, this.readTimeout));
    builder.setRedirectsEnabled(config.get(
        CommonClientConfigKey.FollowRedirects, this.followRedirects));

    final RequestConfig requestConfig = builder.build();
    final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create(this.getClientName(), this);
    RetryCallback retryCallback = new RetryCallback() {
      @Override
      public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception {
        //on retries the policy will choose the server and set it in the context
        //extract the server and update the request being made
        RibbonApacheHttpRequest newRequest = request;
        if(context instanceof LoadBalancedRetryContext) {
          ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance();
          if(service != null) {
            //Reconstruct the request URI using the host and port set in the retry context
            newRequest = newRequest.withNewUri(new URI(service.getUri().getScheme(),
                newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(),
                newRequest.getURI().getPath(), newRequest.getURI().getQuery(),
                newRequest.getURI().getFragment()));
          }
        }
        newRequest = getSecureRequest(request, configOverride);
        HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig);
        final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient.this.delegate.execute(httpUriRequest);
        if(retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) {
          if(CloseableHttpResponse.class.isInstance(httpResponse)) {
            ((CloseableHttpResponse)httpResponse).close();
          }
          throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient.this.clientName,
              httpResponse.getStatusLine().getStatusCode());
        }
        return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
      }
    };
    return this.executeWithRetry(request, retryPolicy, retryCallback);
  }

我们发现先new 一个RetryCallback,然后执行this.executeWithRetry(request, retryPolicy, retryCallback);

而这个RetryCallback.doWithRetry的代码我们清楚看到是实际请求的代码,也就是说this.executeWithRetry方法最终还是会调用RetryCallback.doWithRetry

protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
      RecoveryCallback<T> recoveryCallback, RetryState state)
      throws E, ExhaustedRetryException {

    RetryPolicy retryPolicy = this.retryPolicy;
    BackOffPolicy backOffPolicy = this.backOffPolicy;

    // Allow the retry policy to initialise itself...
    RetryContext context = open(retryPolicy, state);
    if (this.logger.isTraceEnabled()) {
      this.logger.trace("RetryContext retrieved: " + context);
    }

    // Make sure the context is available globally for clients who need
    // it...
    RetrySynchronizationManager.register(context);

    Throwable lastException = null;

    boolean exhausted = false;
    try {

      // Give clients a chance to enhance the context...
      boolean running = doOpenInterceptors(retryCallback, context);

      if (!running) {
        throw new TerminatedRetryException(
            "Retry terminated abnormally by interceptor before first attempt");
      }

      // Get or Start the backoff context...
      BackOffContext backOffContext = null;
      Object resource = context.getAttribute("backOffContext");

      if (resource instanceof BackOffContext) {
        backOffContext = (BackOffContext) resource;
      }

      if (backOffContext == null) {
        backOffContext = backOffPolicy.start(context);
        if (backOffContext != null) {
          context.setAttribute("backOffContext", backOffContext);
        }
      }

      /*
       * We allow the whole loop to be skipped if the policy or context already
       * forbid the first try. This is used in the case of external retry to allow a
       * recovery in handleRetryExhausted without the callback processing (which
       * would throw an exception).
       */
      while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

        try {
          if (this.logger.isDebugEnabled()) {
            this.logger.debug("Retry: count=" + context.getRetryCount());
          }
          // Reset the last exception, so if we are successful
          // the close interceptors will not think we failed...
          lastException = null;
          return retryCallback.doWithRetry(context);
        }
        catch (Throwable e) {

          lastException = e;

          try {
            registerThrowable(retryPolicy, state, context, e);
          }
          catch (Exception ex) {
            throw new TerminatedRetryException("Could not register throwable",
                ex);
          }
          finally {
            doOnErrorInterceptors(retryCallback, context, e);
          }

          if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
            try {
              backOffPolicy.backOff(backOffContext);
            }
            catch (BackOffInterruptedException ex) {
              lastException = e;
              // back off was prevented by another thread - fail the retry
              if (this.logger.isDebugEnabled()) {
                this.logger
                    .debug("Abort retry because interrupted: count="
                        + context.getRetryCount());
              }
              throw ex;
            }
          }

          if (this.logger.isDebugEnabled()) {
            this.logger.debug(
                "Checking for rethrow: count=" + context.getRetryCount());
          }

          if (shouldRethrow(retryPolicy, context, state)) {
            if (this.logger.isDebugEnabled()) {
              this.logger.debug("Rethrow in retry for policy: count="
                  + context.getRetryCount());
            }
            throw RetryTemplate.<E>wrapIfNecessary(e);
          }

        }

        /*
         * A stateful attempt that can retry may rethrow the exception before now,
         * but if we get this far in a stateful retry there's a reason for it,
         * like a circuit breaker or a rollback classifier.
         */
        if (state != null && context.hasAttribute(GLOBAL_STATE)) {
          break;
        }
      }

      if (state == null && this.logger.isDebugEnabled()) {
        this.logger.debug(
            "Retry failed last attempt: count=" + context.getRetryCount());
      }

      exhausted = true;
      return handleRetryExhausted(recoveryCallback, context, state);

    }
    catch (Throwable e) {
      throw RetryTemplate.<E>wrapIfNecessary(e);
    }
    finally {
      close(retryPolicy, context, state, lastException == null || exhausted);
      doCloseInterceptors(retryCallback, context, lastException);
      RetrySynchronizationManager.clear();
    }
  }

在一个while循环里实现重试机制,当执行retryCallback.doWithRetry(context)出现异常的时候,就会catch异常,然后用 retryPolicy判断是否进行重试,特别注意registerThrowable(retryPolicy, state, context, e);方法,不但判断了是否重试,在重试情况下会新选出一个机子放入context,然后再去执行retryCallback.doWithRetry(context)时带入,如此就实现了换机子重试了。

但是我的配置怎么会没有换机子呢?调试代码发现registerThrowable(retryPolicy, state, context, e);选出来的机子没问题,就是新的健康的机子,但是在执行retryCallback.doWithRetry(context)代码的时候依然请求的是那台挂掉的机子。

所以我们再仔细看一下retryCallback.doWithRetry(context)的代码:

我们发现了这行代码:

newRequest = getSecureRequest(request, configOverride);
protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) {
    if (isSecure(configOverride)) {
      final URI secureUri = UriComponentsBuilder.fromUri(request.getUri())
          .scheme("https").build(true).toUri();
      return request.withNewUri(secureUri);
    }
    return request;
  }

newRequest在前面已经使用context构建完毕,request是上一次请求的数据,只要执行这个代码就会发现newRequest永远都会被request覆盖。看到这里我们才发现原来是一个源码bug。

issue地址:https://github.com/spring-cloud/spring-cloud-netflix/issues/2667

总结

这是一次很普通的查问题过程,在这个过程中当我发现配置没有达到我的预期时,我先查看了配置的含义,尝试多次无果,于是进行断点调试发现异常中断点后,因为场景需要一台机子健康一台机子下线,我模拟了数百次,最终才定位到了这行代码。开源项目即使是优秀的项目必然也会有bug存在,不迷信,不盲目。另一方面,阅读源码能力也是一个解决问题的重要能力,像我在找源码入口,定位代码时耗费了很多的时间。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Cloud Zuul的重试配置详解

    Spring Cloud Zuul模块本身就包含了对于hystrix和ribbon的依赖,当我们使用zuul通过path和serviceId的组合来配置路由的时候,可以通过hystrix和ribbon的配置调整路由请求的各种时间超时机制. 1 ribbon配置举例 配置连接超时时间1秒,请求处理时间2秒,统一服务server尝试重连1次,切换server重连1次 ribbon: ConnectTimeout: 1000 ReadTimeout: 2000 MaxAutoRetries: 1 Ma

  • Spring Cloud重试机制与各组件的重试总结

    SpringCloud重试机制配置 首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例. @Bean @LoadBalanced RestTemplate restTemplate() { HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(); httpRequestFactory.se

  • Spring Cloud Config Client超时及重试示例详解

    简介 有时客户端需要在 config server 无响应时进行重试,以给 config server 时间进行恢复.利用 spring 提供的重试组件,我们可以方便的配置重试机制,包括重试间隔,重试次数等.下面话不多说了,来一起看看详细的介绍吧. 项目源码 点击下载 为 web 项目添加依赖 开启客户端重试功能需要两个新依赖,spring-retry 和 spring-boot-starter-aop,把如下代码添加到 web 项目的 pom.xml 文件中: <dependency> &l

  • 详解Spring Cloud Zuul重试机制探秘

    简介 本文章对应spring cloud的版本为(Dalston.SR4),具体内容如下: 开启Zuul功能 通过源码了解Zuul的一次转发 怎么开启zuul的重试机制 Edgware.RC1版本的优化 开启Zuul的功能 首先如何使用spring cloud zuul完成路由转发的功能,这个问题很简单,只需要进行如下准备工作即可: 注册中心(Eureka Server) zuul(同时也是Eureka Client) 应用服务(同时也是Eureka Client) 我们希望zuul和后端的应用

  • SpringCloud重试机制配置详解

    首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例. @Bean @LoadBalanced RestTemplate restTemplate() { HttpComponentsClientHttpRequestFactory httpRequestFactory = new HttpComponentsClientHttpRequestFactory(); httpRequestFactory.setReadTimeout(5000)

  • Spring Cloud 请求重试机制核心代码分析

    场景 发布微服务的操作一般都是打完新代码的包,kill掉在跑的应用,替换新的包,启动. spring cloud 中使用eureka为注册中心,它是允许服务列表数据的延迟性的,就是说即使应用已经不在服务列表了,客户端在一段时间内依然会请求这个地址.那么就会出现请求正在发布的地址,而导致失败. 我们会优化服务列表的刷新时间,以提高服务列表信息的时效性.但是无论怎样,都无法避免有那么一段时间是数据不一致的. 所以我们想到一个办法就是重试机制,当a机子在重启时,同个集群的b是可以正常提供服务的,如果有

  • Spring Cloud Gateway重试机制的实现

    前言 重试,我相信大家并不陌生.在我们调用Http接口的时候,总会因为某种原因调用失败,这个时候我们可以通过重试的方式,来重新请求接口. 生活中这样的事例很多,比如打电话,对方正在通话中啊,信号不好啊等等原因,你总会打不通,当你第一次没打通之后,你会打第二次,第三次...第四次就通了. 重试也要注意应用场景,读数据的接口比较适合重试的场景,写数据的接口就需要注意接口的幂等性了.还有就是重试次数如果太多的话会导致请求量加倍,给后端造成更大的压力,设置合理的重试机制才是最关键的. 今天我们来简单的了

  • Spring Cloud Gateway重试机制原理解析

    重试,我相信大家并不陌生.在我们调用Http接口的时候,总会因为某种原因调用失败,这个时候我们可以通过重试的方式,来重新请求接口. 生活中这样的事例很多,比如打电话,对方正在通话中啊,信号不好啊等等原因,你总会打不通,当你第一次没打通之后,你会打第二次,第三次-第四次就通了. 重试也要注意应用场景,读数据的接口比较适合重试的场景,写数据的接口就需要注意接口的幂等性了.还有就是重试次数如果太多的话会导致请求量加倍,给后端造成更大的压力,设置合理的重试机制才是最关键的. 今天我们来简单的了解下Spr

  • 掌握Android Handler消息机制核心代码

    目录 一.handler基本认识 1.基本组成 2.基本使用方法 3.工作流程 二.发送消息 三.消息进入消息队列 1.入队前的准备工作 2.将消息加入队列 四.从消息队列里取出消息 1.准备工作 2.loop中的操作 2.1 MessageQueue的next方法 五.消息的处理 六.其他关键点 1. Loop的创建 2.Handler的创建 3.Message的创建.回收和复用机制 4. IdleHandler 5.Handler在Framework层的应用 阅读前需要对handler有一些

  • spring cloud gateway网关路由分配代码实例解析

    这篇文章主要介绍了spring cloud gateway网关路由分配代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1, 基于父工程,新建一个模块 2,pom文件添加依赖 <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-

  • SpringBoot整合spring-retry实现接口请求重试机制及注意事项

    目录 一.重试机制 二.重试机制要素 三.重试机制注意事项 四.SpringBoot整合spring-retry 1)添加依赖 2)添加@EnableRetry注解 3)添加@Retryable注解 4)Controller测试代码 5)发送请求 6)补充:@Recover 一.重试机制 由于网络不稳定或网络抖动经常会造成接口请求失败的情况,当我们再去尝试就成功了,这就是重试机制. 其主要目的就是要尽可能地提高请求成功的概率,但一般情况下,我们请求第一次失败,代码运行就抛出异常结束了,如果想再次

  • Spring Cloud Gateway远程命令执行漏洞分析(CVE-2022-22947)

    目录 漏洞描述 环境搭建 漏洞复现 声明:本文仅供学习参考,其中涉及的一切资源均来源于网络,请勿用于任何非法行为,否则您将自行承担相应后果,本人不承担任何法律及连带责任. 漏洞描述 使用Spring Cloud Gateway的应用程序在Actuator端点启用.公开和不安全的情况下容易受到代码注入的攻击.攻击者可以恶意创建允许在远程主机上执行任意远程执行的请求. 当攻击者可以访问actuator API时,就可以利用该漏洞执行任意命令. 影响范围 Spring Cloud Gateway <

  • 从php核心代码分析require和include的区别

    深入理解PHP之require/include顺序 http://www.jb51.net/article/25867.htm普及在php手册中: require() is identical to include() except upon failure it will also produce a fatal E_ERROR level error. In other words, it will halt the script whereas include() only emits a

  • C# Socket连接请求超时机制实现代码分享

    .Net的System.Net.Sockets.TcpClient和System.Net.Sockets.Socket都没有直接为Connect/BeginConnect提供超时控制机制.因此,当服务器未处于监听状态,或者发生网络故障时,客户端连接请求会被迫等待很长一段时间,直到抛出异常.默认的等待时间长达20~30s..Net Socket库的SocketOptionName.SendTimeout提供了控制发送数据的超时时间,但并非本文讨论的连接请求的超时时间.实现 下面是实现的关键代码:

随机推荐