详解Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失

在Spring Cloud中我们用Hystrix来实现断路器,Zuul中默认是用信号量(Hystrix默认是线程)来进行隔离的,我们可以通过配置使用线程方式隔离。

在使用线程隔离的时候,有个问题是必须要解决的,那就是在某些业务场景下通过ThreadLocal来在线程里传递数据,用信号量是没问题的,从请求进来,但后续的流程都是通一个线程。

当隔离模式为线程时,Hystrix会将请求放入Hystrix的线程池中去执行,这个时候某个请求就有A线程变成B线程了,ThreadLocal必然消失了。

下面我们通过一个简单的列子来模拟下这个流程:

public class CustomThreadLocal {
  static ThreadLocal<String> threadLocal = new ThreadLocal<>();
  public static void main(String[] args) {
    new Thread(new Runnable() {
      @Override
      public void run() {
        CustomThreadLocal.threadLocal.set("猿天地");
        new Service().call();
      }
    }).start();
  }
}
class Service {
  public void call() {
    System.out.println("Service:" + Thread.currentThread().getName());
    System.out.println("Service:" + CustomThreadLocal.threadLocal.get());
    new Dao().call();
  }
}
class Dao {
  public void call() {
    System.out.println("==========================");
    System.out.println("Dao:" + Thread.currentThread().getName());
    System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
  }
}

我们在主类中定义了一个ThreadLocal用来传递数据,然后起了一个线程,在线程中调用Service中的call方法,并且往Threadlocal中设置了一个值,在Service中获取ThreadLocal中的值,然后再调用Dao中的call方法,也是获取ThreadLocal中的值,我们运行下看效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-0
Dao:猿天地

可以看到整个流程都是在同一个线程中执行的,也正确的获取到了ThreadLocal中的值,这种情况是没有问题的。

接下来我们改造下程序,进行线程切换,将调用Dao中的call重启一个线程执行:

public class CustomThreadLocal {
  static ThreadLocal<String> threadLocal = new ThreadLocal<>();
  public static void main(String[] args) {
    new Thread(new Runnable() {
      @Override
      public void run() {
        CustomThreadLocal.threadLocal.set("猿天地");
        new Service().call();
      }
    }).start();
  }
}
class Service {
  public void call() {
    System.out.println("Service:" + Thread.currentThread().getName());
    System.out.println("Service:" + CustomThreadLocal.threadLocal.get());
    //new Dao().call();
    new Thread(new Runnable() {
      @Override
      public void run() {
        new Dao().call();
      }
    }).start();
  }
}
class Dao {
  public void call() {
    System.out.println("==========================");
    System.out.println("Dao:" + Thread.currentThread().getName());
    System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
  }
}

再次运行,看效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-1
Dao:null

可以看到这次的请求是由2个线程共同完成的,在Service中还是可以拿到ThreadLocal的值,到了Dao中就拿不到了,因为线程已经切换了,这就是开始讲的ThreadLocal的数据会丢失的问题。

那么怎么解决这个问题呢,其实也很简单,只需要改一行代码即可:

static ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

将ThreadLocal改成InheritableThreadLocal,我们看下改造之后的效果:

Service:Thread-0
Service:猿天地
==========================
Dao:Thread-1
Dao:猿天地

值可以正常拿到,InheritableThreadLocal就是为了解决这种线程切换导致ThreadLocal拿不到值的问题而产生的。

要理解InheritableThreadLocal的原理,得先理解ThreadLocal的原理,我们稍微简单的来介绍下ThreadLocal的原理:

每个线程都有一个 ThreadLocalMap 类型的 threadLocals 属性,ThreadLocalMap 类相当于一个Map,key 是 ThreadLocal 本身,value 就是我们设置的值。

public class Thread implements Runnable {
  ThreadLocal.ThreadLocalMap threadLocals = null;
}

当我们通过 threadLocal.set(“猿天地”); 的时候,就是在这个线程中的 threadLocals 属性中放入一个键值对,key 是 当前线程,value 就是你设置的值猿天地。

public void set(T value) {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null)
    map.set(this, value);
  else
    createMap(t, value);
}

当我们通过 threadlocal.get() 方法的时候,就是根据当前线程作为key来获取这个线程设置的值。

public T get() {
  Thread t = Thread.currentThread();
  ThreadLocalMap map = getMap(t);
  if (map != null) {
    ThreadLocalMap.Entry e = map.getEntry(this);
    if (e != null) {
       @SuppressWarnings("unchecked")
       T result = (T)e.value;
       return result;
    }
  }
  return setInitialValue();
}

通过上面的介绍我们可以了解到threadlocal能够传递数据是用Thread.currentThread()当前线程来获取,也就是只要在相同的线程中就可以获取到前方设置进去的值。

如果在threadlocal设置完值之后,下步的操作重新创建了一个线程,这个时候Thread.currentThread()就已经变了,那么肯定是拿不到之前设置的值。具体的问题复现可以参考上面我的代码。

那为什么InheritableThreadLocal就可以呢?

InheritableThreadLocal这个类继承了ThreadLocal,重写了3个方法,在当前线程上创建一个新的线程实例Thread时,会把这些线程变量从当前线程传递给新的线程实例。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
  /**
   * Computes the child's initial value for this inheritable thread-local
   * variable as a function of the parent's value at the time the child
   * thread is created. This method is called from within the parent
   * thread before the child is started.
   * <p>
   * This method merely returns its input argument, and should be overridden
   * if a different behavior is desired.
   *
   * @param parentValue the parent thread's value
   * @return the child thread's initial value
   */
  protected T childValue(T parentValue) {
    return parentValue;
  }
  /**
   * Get the map associated with a ThreadLocal.
   *
   * @param t the current thread
   */
  ThreadLocalMap getMap(Thread t) {
    return t.inheritableThreadLocals;
  }
  /**
   * Create the map associated with a ThreadLocal.
   *
   * @param t the current thread
   * @param firstValue value for the initial entry of the table.
   */
  void createMap(Thread t, T firstValue) {
    t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
  }
}

通过上面的代码我们可以看到InheritableThreadLocal 重写了childValue, getMap,createMap三个方法,当我们往里面set值的时候,值保存到了inheritableThreadLocals里面,而不是之前的threadLocals。

关键的点来了,为什么当创建新的线程池,可以获取到上个线程里的threadLocal中的值呢?原因就是在新创建线程的时候,会把之前线程的inheritableThreadLocals赋值给新线程的inheritableThreadLocals,通过这种方式实现了数据的传递。

源码最开始在Thread的init方法中,如下:

if (parent.inheritableThreadLocals != null)
  this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

createInheritedMap如下:

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
  }

赋值代码:

 private ThreadLocalMap(ThreadLocalMap parentMap) {
   Entry[] parentTable = parentMap.table;
   int len = parentTable.length;
   setThreshold(len);
   table = new Entry[len];
   for (int j = 0; j < len; j++) {
      Entry e = parentTable[j];
      if (e != null) {
        @SuppressWarnings("unchecked")
        ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
        if (key != null) {
          Object value = key.childValue(e.value);
          Entry c = new Entry(key, value);
          int h = key.threadLocalHashCode & (len - 1);
          while (table[h] != null)
            h = nextIndex(h, len);
          table[h] = c;
          size++;
        }
      }
    }
}

到此为止,通过inheritableThreadLocals我们可以在父线程创建子线程的时候将Local中的值传递给子线程,这个特性已经能够满足大部分的需求了,但是还有一个很严重的问题是如果是在线程复用的情况下就会出问题,比如线程池中去使用inheritableThreadLocals 进行传值,因为inheritableThreadLocals 只是会再新创建线程的时候进行传值,线程复用并不会做这个操作,那么要解决这个问题就得自己去扩展线程类,实现这个功能。

不要忘记我们是做Java的哈,开源的世界有你需要的任何东西,下面我给大家推荐一个实现好了的Java库,是阿里开源的transmittable-thread-local。

GitHub地址:https://github.com/alibaba/transmittable-thread-local

主要功能就是解决在使用线程池等会缓存线程的组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

JDK的InheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会缓存线程的组件的情况,线程由线程池创建好,并且线程是缓存起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到任务执行时。

transmittable-thread-local使用方式分为三种,修饰Runnable和Callable,修饰线程池,Java Agent来修饰JDK线程池实现类

接下来给大家演示下线程池的修饰方式,首先来一个非正常的案例,代码如下:

public class CustomThreadLocal {
  static ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
  static ExecutorService pool = Executors.newFixedThreadPool(2);
  public static void main(String[] args) {
    for(int i=0;i<100;i++) {
       int j = i;
      pool.execute(new Thread(new Runnable() {
        @Override
        public void run() {
          CustomThreadLocal.threadLocal.set("猿天地"+j);
          new Service().call();
        }
      }));
    }
  }
}
class Service {
  public void call() {
    CustomThreadLocal.pool.execute(new Runnable() {
      @Override
      public void run() {
        new Dao().call();
      }
    });
  }
}
class Dao {
  public void call() {
     System.out.println("Dao:" + CustomThreadLocal.threadLocal.get());
  }
}

运行上面的代码出现的结果是不正确的,输出结果如下:

Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99
Dao:猿天地99

正确的应该是从1到100,由于线程的复用,值被替换掉了才会出现不正确的结果

接下来使用transmittable-thread-local来改造有问题的代码,添加transmittable-thread-local的Maven依赖:

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>transmittable-thread-local</artifactId>
  <version>2.2.0</version>
</dependency>

只需要修改2个地方,修饰线程池和替换InheritableThreadLocal:

static TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
static ExecutorService pool = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(2));

正确的结果如下:

Dao:猿天地85
Dao:猿天地84
Dao:猿天地86
Dao:猿天地87
Dao:猿天地88
Dao:猿天地90
Dao:猿天地89
Dao:猿天地91
Dao:猿天地93
Dao:猿天地92
Dao:猿天地94
Dao:猿天地95
Dao:猿天地97
Dao:猿天地96
Dao:猿天地98
Dao:猿天地99

到这里我们就已经可以完美的解决线程中,线程池中ThreadLocal数据的传递了,各位看官又疑惑了,标题不是讲的Spring Cloud中如何解决这个问题么,我也是在Zuul中发现这个问题的,解决方案已经告诉大家了,至于怎么解决Zuul中的这个问题就需要大家自己去思考了,后面有时间我再分享给大家。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解springcloud Feign的Hystrix支持

    本文介绍了springcloud Feign的Hystrix支持,分享给大家,具体如下: 一.Feign client中加入Hystrix的fallback @FeignClient(name="springboot-h2", fallback=HystrixClientFallback.class) //在fallback属性中指定断路器的fallback public interface UserFeignClient { // @GetMapping("/user/{i

  • 详解spring cloud使用Hystrix实现单个方法的fallback

    本文介绍了spring cloud-使用Hystrix实现单个方法的fallback,分享给大家,具体如下: 一.加入Hystrix依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> 二.编写Controller package c

  • spring cloud 使用Hystrix 实现断路器进行服务容错保护的方法

    在微服务中,我们将系统拆分为很多个服务单元,各单元之间通过服务注册和订阅消费的方式进行相互依赖.但是如果有一些服务出现问题了会怎么样? 比如说有三个服务(ABC),A调用B,B调用C.由于网络延迟或C本身代码有问题导致B迟迟得不到回应,这样B调用C的请求就会被挂起,等待. 在高并发的访问的情况下,这些挂起的线程得不到释放,使后续的请求阻塞,最终导致B也挂掉了.依次类推,A可能也会挂掉,进而使整个系统全部崩溃. 为了解决整个问题,Spring Cloud 使用Hystrix进行服务容错保护,包括断

  • 详解Spring Cloud Hystrix断路器实现容错和降级

    简介 Spring cloud提供了Hystrix容错库用以在服务不可用时,对配置了断路器的方法实行降级策略,临时调用备用方法.这篇文章将创建一个产品微服务,注册到eureka服务注册中心,然后我们使用web客户端访问/products API来获取产品列表,当产品服务故障时,则调用本地备用方法,以降级但正常提供服务. 基础环境 JDK 1.8 Maven 3.3.9 IntelliJ 2018.1 Git:项目源码 添加产品服务 在intelliJ中创建一个新的maven项目,使用如下配置 g

  • 详解SpringCloud微服务架构之Hystrix断路器

    一:什么是Hystrix 在分布式环境中,许多服务依赖项中的一些将不可避免地失败.Hystrix是一个库,通过添加延迟容差和容错逻辑来帮助您控制这些分布式服务之间的交互.Hystrix通过隔离服务之间的访问点,停止其间的级联故障以及提供回退选项,从而提高系统的整体弹性. Hystrix旨在执行以下操作 1:对通过第三方客户端库访问(通常通过网络)的依赖关系提供保护并控制延迟和故障. 2:隔离复杂分布式系统中的级联故障. 3:快速发现故障,尽快恢复. 4:回退,尽可能优雅地降级. 5:启用近实时监

  • springcloud 熔断器Hystrix的具体使用

    说起springcloud熔断让我想起了去年股市中的熔断,多次痛的领悟,随意实施的熔断对整个系统的影响是灾难性的,好了接下来我们还是说正事. 熔断器 雪崩效应 在微服务架构中通常会有多个服务层调用,基础服务的故障可能会导致级联故障,进而造成整个系统不可用的情况,这种现象被称为服务雪崩效应.服务雪崩效应是一种因"服务提供者"的不可用导致"服务消费者"的不可用,并将不可用逐渐放大的过程. 如果下图所示:A作为服务提供者,B为A的服务消费者,C和D是B的服务消费者.A不可

  • 详解Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失

    在Spring Cloud中我们用Hystrix来实现断路器,Zuul中默认是用信号量(Hystrix默认是线程)来进行隔离的,我们可以通过配置使用线程方式隔离. 在使用线程隔离的时候,有个问题是必须要解决的,那就是在某些业务场景下通过ThreadLocal来在线程里传递数据,用信号量是没问题的,从请求进来,但后续的流程都是通一个线程. 当隔离模式为线程时,Hystrix会将请求放入Hystrix的线程池中去执行,这个时候某个请求就有A线程变成B线程了,ThreadLocal必然消失了. 下面我

  • 详解Spring Cloud中Hystrix的请求合并

    在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解Hystrix的请求合并. Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是HystrixCo

  • 详解spring cloud中使用Ribbon实现客户端的软负载均衡

    开篇 本例是在springboot整合H2内存数据库,实现单元测试与数据库无关性和使用RestTemplate消费spring boot的Restful服务两个示例的基础上改造而来 在使用RestTemplate来消费spring boot的Restful服务示例中,我们提到,调用spring boot服务的时候,需要将服务的URL写死或者是写在配置文件中,但这两种方式,无论哪一种,一旦ip地址发生了变化,都需要改动程序,并重新部署服务,使用Ribbon的时候,可以有效的避免这个问题. 前言:

  • 详解Spring Cloud Feign 熔断配置的一些小坑

    1.在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,已解决. 2.使用feign默认配置,熔断不生效,已解决. 最近在做微服务的学习,发现在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,代码如下: @RequestMapping("/demo/api") public interface HelloApi { @GetMapping("user/

  • 详解spring cloud config实现datasource的热部署

    关于spring cloud config的基本使用,前面的博客中已经说过了,如果不了解的话,请先看以前的博客 spring cloud config整合gitlab搭建分布式的配置中心 spring cloud config分布式配置中心的高可用 今天,我们的重点是如何实现数据源的热部署. 1.在客户端配置数据源 @RefreshScope @Configuration// 配置数据源 public class DataSourceConfigure { @Bean @RefreshScope

  • 详解spring cloud config整合gitlab搭建分布式的配置中心

    在前面的博客中,我们都是将配置文件放在各自的服务中,但是这样做有一个缺点,一旦配置修改了,那么我们就必须停机,然后修改配置文件后再进行上线,服务少的话,这样做还无可厚非,但是如果是成百上千的服务了,这个时候,就需要用到分布式的配置管理了.而spring cloud config正是用来解决这个问题而生的.下面就结合gitlab来实现分布式配置中心的搭建.spring cloud config配置中心由server端和client端组成, 前提:在gitlab中的工程下新建一个配置文件config

  • 详解spring cloud构建微服务架构的网关(API GateWay)

    前言 在我们前面的博客中讲到,当服务A需要调用服务B的时候,只需要从Eureka中获取B服务的注册实例,然后使用Feign来调用B的服务,使用Ribbon来实现负载均衡,但是,当我们同时向客户端暴漏多个服务的时候,客户端怎么调用我们暴漏的服务了,如果我们还想加入安全认证,权限控制,过滤器以及动态路由等特性了,那么就需要使用Zuul来实现API GateWay了,下面,我们先来看下Zuul怎么使用. 一.加入Zuul的依赖 <dependency> <groupId>org.spri

  • 详解Spring Cloud 跨服务数据聚合框架

    AG-Merge Spring Cloud 跨服务数据聚合框架 解决问题 解决Spring Cloud服务拆分后分页数据的属性或单个对象的属性拆分之痛, 支持对静态数据属性(数据字典).动态主键数据进行自动注入和转化, 其中聚合的静态数据会进行 一级混存 (guava). 举个栗子: 两个服务,A服务的某张表用到了B服务的某张表的值,我们在对A服务那张表查询的时候,把B服务某张表的值聚合在A服务的那次查询过程中 示例 具体示例代码可以看 ace-merge-demo 模块 |------- ac

  • 详解spring cloud整合Swagger2构建RESTful服务的APIs

    前言 在前面的博客中,我们将服务注册到了Eureka上,可以从Eureka的UI界面中,看到有哪些服务已经注册到了Eureka Server上,但是,如果我们想查看当前服务提供了哪些RESTful接口方法的话,就无从获取了,传统的方法是梳理一篇服务的接口文档来供开发人员之间来进行交流,这种情况下,很多时候,会造成文档和代码的不一致性,比如说代码改了,但是接口文档没有改等问题,而Swagger2则给我们提供了一套完美的解决方案,下面,我们来看看Swagger2是如何来解决问题的. 一.引入Swag

随机推荐