详解spring cloud hystrix 请求合并collapsing

HystrixCommand之前可以使用请求合并器(HystrixCollapser就是一个抽象的父类)来把多个请求合并成一个然后对后端依赖系统发起调用。

下图显示了两种情况下线程的数量和网络的连接数的情况:第一种是不使用合并器,第二种是使用请求合并器(假设所有的链接都是在一个短的时间窗口内并行的,比如10ms内)。

为什么要使用请求合并?

使用请求合并来减少执行并发HystrixCommand执行所需的线程数和网络连接数,请求合并是自动执行的,不会强制开发人员手动协调批处理请求。

全局上下文-global context(跨越所有Tomcat线程)

这种合并类型是在全局应用级别上完成的,因此任何Tomcat线程上的任何用户的请求都可以一起合并。

例如,如果您配置一个HystrixCommand支持任何用户请求依赖关系来检索电影评级,那么当同一个JVM中的任何用户线程发出这样的请求时,Hystrix会将其请求与任何其他请求一起添加到同一个已折叠网络通话。

用户请求上下文-request context(单个Tomcat线程)

如果你配置一个HystrixCommand仅仅为一个单个用户处理批量请求,Hystrix可以在一个Tomcat线程(请求)中合并请求。

例如,一个用户想要加载300个视频对象的书签,不是去执行300次网络请求,Hystrix能够将他们合并成为一个。

Hystrix默认是的就是request-scope,要使用request-scoped的功能(request caching,request collapsing, request log)你必须管理HystrixRequestContext的生命周期(或者实现一个可替代的HystrixConcurrencyStrategy
这就意味你在执行一个请求之前需要执行以下的代码:

代码如下:

HystrixRequestContext  context=HystrixRequestContext.initializeContext();

并且在请求的结束位置执行:

context.shutdown();

在标准的JavaWeb应用中,你也可以使用一个Servlet过滤器来初始化这个生命周期

public class HystrixRequestContextServletFilter implements Filter {

 public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
  throws IOException, ServletException {
  HystrixRequestContext context = HystrixRequestContext.initializeContext();
  try {
   chain.doFilter(request, response);
  } finally {
   context.shutdown();
  }
 }
}

然后将它配置在web.xml中

 <filter>
  <display-name>HystrixRequestContextServletFilter</display-name>
  <filter-name>HystrixRequestContextServletFilter</filter-name>
  <filter-class>com.netflix.hystrix.contrib.requestservlet.HystrixRequestContextServletFilter</filter-class>
 </filter>
 <filter-mapping>
  <filter-name>HystrixRequestContextServletFilter</filter-name>
  <url-pattern>/*</url-pattern>
 </filter-mapping>

如果你是springboot开发的话代码如下:

@WebFilter(filterName = "hystrixRequestContextServletFilter",urlPatterns = "/*")
public class HystrixRequestContextServletFilter implements Filter {
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {

 }

 @Override
 public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
  HystrixRequestContext context = HystrixRequestContext.initializeContext();
  try{
   filterChain.doFilter(servletRequest,servletResponse);
  }finally {
   context.shutdown();
  }
 }

 @Override
 public void destroy() {

 }
}
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
@EnableHystrix
//这个是必须的,否则filter无效
@ServletComponentScan
public class Application {

 public static void main(String[] args) {
  new SpringApplicationBuilder(Application.class).web(true).run(args);
 }

}

请求合并的成本是多少?

启用请求合并的成本是在执行实际命令之前的延迟。最大的成本是批处理窗口的大小,默认是10ms。

如果你有一个命令需要花费5ms去执行并且有一个10ms的批处理窗口,执行的时间最坏的情况是15ms,一般情况下,请求不会在批处理窗口刚打开的时候发生,所以时间窗口的中间值是时间窗口的一半,在这种情况下是5ms。

这个成本是否值得取决于正在执行的命令,高延迟命令不会受到少量附加平均延迟的影响。而且,给定命令的并发量也是关键:如果很少有超过1个或2个请求被组合在一起,那么这个成本就是不值得的。事实上,在一个单线程的顺序迭代请求合并将会是一个主要的性能瓶颈,每一次迭代都会等待10ms的窗口等待时间。

但是,如果一个特定的命令同时被大量使用,并且可以同时批量打几十个甚至几百个呼叫,那么成本通常远远超过所达到的吞吐量的增加,因为Hystrix减少了它所需的线程数量,依赖。(这段话不太好理解,其实就是说如果并发比较高,这个成本是值得的,因为hystrix可以节省很多线程和连接资源)。

请求合并的流程(如下图)

理论知识已经讲完了,下面来看看例子,下面的例子集成了eureka+feign+hystrix,完整的例子请查看:https://github.com/jingangwang/micro-service

实体类

public class User {

 private Integer id;
 private String username;
 private Integer age;

 public User() {
 }

 public User(Integer id, String username, Integer age) {
  this.id = id;
  this.username = username;
  this.age = age;
 }

 public Integer getId() {
  return id;
 }

 public void setId(Integer id) {
  this.id = id;
 }

 public String getUsername() {
  return username;
 }

 public void setUsername(String username) {
  this.username = username;
 }

 public Integer getAge() {
  return age;
 }

 public void setAge(Integer age) {
  this.age = age;
 }

 @Override
 public String toString() {
  final StringBuffer sb = new StringBuffer("User{");
  sb.append("id=").append(id);
  sb.append(", username='").append(username).append('\'');
  sb.append(", age=").append(age);
  sb.append('}');
  return sb.toString();
 }
}

服务提供者代码

@RestController
@RequestMapping("user")
public class UserController {

 @RequestMapping("getUser")
 public User getUser(Integer id) {
  return new User(id, "test", 29);
 }

 @RequestMapping("getAllUser")
 public List<User> getAllUser(String ids){
  String[] split = ids.split(",");
  return Arrays.asList(split)
    .stream()
    .map(id -> new User(Integer.valueOf(id),"test"+id,30))
    .collect(Collectors.toList());
 }
}

消费者代码

UserFeignClient

@FeignClient(name = "eureka-provider",configuration = FeignConfiguration.class)
public interface UserFeignClient {
 /**
  * 根据id查找用户
  * @param id 用户id
  * @return  User
  */
 @RequestMapping(value = "user/getUser.json",method = RequestMethod.GET)
 User findUserById(@RequestParam("id") Integer id);

 /**
  * 超找用户列表
  * @param ids id列表
  * @return 用户的集合
  */
 @RequestMapping(value = "user/getAllUser.json",method = RequestMethod.GET)
 List<User> findAllUser(@RequestParam("ids") String ids);
}

UserService(设置为全局上下文)

@Service
public class UserService {
 @Autowired
 private UserFeignClient userFeignClient;

 /**
  * maxRequestsInBatch        该属性设置批量处理的最大请求数量,默认值为Integer.MAX_VALUE
  * timerDelayInMilliseconds       该属性设置多长时间之内算一次批处理,默认为10ms
  * @param id
  * @return
  */
 @HystrixCollapser(collapserKey = "findCollapserKey",scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,batchMethod = "findAllUser",collapserProperties = {
   @HystrixProperty(name = "timerDelayInMilliseconds",value = "5000" ),
   @HystrixProperty(name = "maxRequestsInBatch",value = "5" )
 })
 public Future<User> find(Integer id){
  return null;
 }

 @HystrixCommand(commandKey = "findAllUser")
 public List<User> findAllUser(List<Integer> ids){
  return userFeignClient.findAllUser(StringUtils.join(ids,","));
 }
}

FeignCollapserController

@RequestMapping("user")
@RestController
public class FeignCollapserController {
 @Autowired
 private UserService userService;
 @RequestMapping("findUser")
 public User getUser(Integer id) throws ExecutionException, InterruptedException {
  return userService.find(id).get();
 }

上面的代码我们这是的是全局上下文(所有tomcat的线程的请求都可以合并),合并的时间窗口为5s(每一次请求都得等5s才发起请求),最大合并数为5。我们在postman中,5s之内发起两次请求,用户id不一样。

localhost:8082/user/findUser.json?id=123189891

localhost:8082/user/findUser.json?id=222222

结果如下图所示,两次请求合并为一次请求批量请求。

我们再来测试一下请求上下文(Request-Scope)的情况,加入上面所提到的HystrixRequestContextServletFilter,并修改UserService

HystrixRequestContextServletFilter

/**
 * @author wjg
 * @date 2017/12/22 15:15
 */
@WebFilter(filterName = "hystrixRequestContextServletFilter",urlPatterns = "/*")
public class HystrixRequestContextServletFilter implements Filter {
 @Override
 public void init(FilterConfig filterConfig) throws ServletException {

 }

 @Override
 public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
  HystrixRequestContext context = HystrixRequestContext.initializeContext();
  try{
   filterChain.doFilter(servletRequest,servletResponse);
  }finally {
   context.shutdown();
  }
 }

 @Override
 public void destroy() {

 }
}

UserService(设置为请求上下文)

@Service
public class UserService {
 @Autowired
 private UserFeignClient userFeignClient;

 /**
  * maxRequestsInBatch        该属性设置批量处理的最大请求数量,默认值为Integer.MAX_VALUE
  * timerDelayInMilliseconds       该属性设置多长时间之内算一次批处理,默认为10ms
  * @param id
  * @return
  */
 @HystrixCollapser(collapserKey = "findCollapserKey",scope = com.netflix.hystrix.HystrixCollapser.Scope.REQUEST,batchMethod = "findAllUser",collapserProperties = {
   @HystrixProperty(name = "timerDelayInMilliseconds",value = "5000" ),
   @HystrixProperty(name = "maxRequestsInBatch",value = "5" )
 })
 public Future<User> find(Integer id){
  return null;
 }

 @HystrixCommand(commandKey = "findAllUser")
 public List<User> findAllUser(List<Integer> ids){
  return userFeignClient.findAllUser(StringUtils.join(ids,","));
 }
}

FeignCollapser2Controller

@RequestMapping("user")
@RestController
public class FeignCollapser2Controller {
 @Autowired
 private UserService userService;

 @RequestMapping("findUser2")
 public List<User> getUser() throws ExecutionException, InterruptedException {
  Future<User> user1 = userService.find(1989);
  Future<User> user2= userService.find(1990);
  List<User> users = new ArrayList<>();
  users.add(user1.get());
  users.add(user2.get());
  return users;
 }
}

我们在postman中输入:localhost:8082/user/findUser2.json

可以看到一个请求内的两次连续调用被合并了。这个地方要注意,不能直接使用userServer.find(1989).get(),否则直接按同步执行处理,不会合并。如果两个tab页同时调用上述地址,发现发起了两次批量请求,说明作用域是request范围。

参考资料如下:

https://github.com/Netflix/Hystrix/wiki/How-To-Use
//www.jb51.net/article/140530.htm

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Spring Cloud中Hystrix的请求合并

    在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解Hystrix的请求合并. Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是HystrixCo

  • 详解spring cloud hystrix 请求合并collapsing

    在HystrixCommand之前可以使用请求合并器(HystrixCollapser就是一个抽象的父类)来把多个请求合并成一个然后对后端依赖系统发起调用. 下图显示了两种情况下线程的数量和网络的连接数的情况:第一种是不使用合并器,第二种是使用请求合并器(假设所有的链接都是在一个短的时间窗口内并行的,比如10ms内). 为什么要使用请求合并? 使用请求合并来减少执行并发HystrixCommand执行所需的线程数和网络连接数,请求合并是自动执行的,不会强制开发人员手动协调批处理请求. 全局上下文

  • 详解spring cloud hystrix请求缓存(request cache)

    hystrix支持将一个请求结果缓存起来,下一个具有相同key的请求将直接从缓存中取出结果,减少请求开销.要使用该功能必须管理HystrixRequestContext,如果请求B要用到请求A的结果缓存,A和B必须同处一个context.通过HystrixRequestContext.initializeContext()和context.shutdown()可以构建一个context,这两条语句间的所有请求都处于同一个context,当然这个管理过程可以通过自定义的filter来实现,参考上一

  • 详解Spring Cloud Hystrix断路器实现容错和降级

    简介 Spring cloud提供了Hystrix容错库用以在服务不可用时,对配置了断路器的方法实行降级策略,临时调用备用方法.这篇文章将创建一个产品微服务,注册到eureka服务注册中心,然后我们使用web客户端访问/products API来获取产品列表,当产品服务故障时,则调用本地备用方法,以降级但正常提供服务. 基础环境 JDK 1.8 Maven 3.3.9 IntelliJ 2018.1 Git:项目源码 添加产品服务 在intelliJ中创建一个新的maven项目,使用如下配置 g

  • 详解spring cloud hystrix缓存功能的使用

    hystrix缓存的作用是 - 1.减少重复的请求数,降低依赖服务的返回数据始终保持一致. - 2.==在同一个用户请求的上下文中,相同依赖服务的返回数据始终保持一致==. - 3.请求缓存在run()和construct()执行之前生效,所以可以有效减少不必要的线程开销. 1 通过HystrixCommand类实现 1.1 开启缓存功能 继承HystrixCommand或HystrixObservableCommand,覆盖getCacheKey()方法,指定缓存的key,开启缓存配置. im

  • 详解spring cloud使用Hystrix实现单个方法的fallback

    本文介绍了spring cloud-使用Hystrix实现单个方法的fallback,分享给大家,具体如下: 一.加入Hystrix依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> 二.编写Controller package c

  • 详解Spring cloud使用Ribbon进行Restful请求

    写在前面 本文由markdown格式写成,为本人第一次这么写,排版可能会有点乱,还望各位海涵.  主要写的是使用Ribbon进行Restful请求,测试各个方法的使用,代码冗余较高,比较适合初学者,介意轻喷谢谢. 前提 一个可用的Eureka注册中心(文中以之前博客中双节点注册中心,不重要) 一个连接到这个注册中心的服务提供者 一个ribbon的消费者 注意:文中使用@GetMapping.@PostMapping.@PutMapping.@DeleteMapping等注解需要升级 spring

  • 详解Spring Cloud 跨服务数据聚合框架

    AG-Merge Spring Cloud 跨服务数据聚合框架 解决问题 解决Spring Cloud服务拆分后分页数据的属性或单个对象的属性拆分之痛, 支持对静态数据属性(数据字典).动态主键数据进行自动注入和转化, 其中聚合的静态数据会进行 一级混存 (guava). 举个栗子: 两个服务,A服务的某张表用到了B服务的某张表的值,我们在对A服务那张表查询的时候,把B服务某张表的值聚合在A服务的那次查询过程中 示例 具体示例代码可以看 ace-merge-demo 模块 |------- ac

  • 详解Spring Cloud Feign 熔断配置的一些小坑

    1.在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,已解决. 2.使用feign默认配置,熔断不生效,已解决. 最近在做微服务的学习,发现在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,代码如下: @RequestMapping("/demo/api") public interface HelloApi { @GetMapping("user/

  • 详解spring cloud config实现datasource的热部署

    关于spring cloud config的基本使用,前面的博客中已经说过了,如果不了解的话,请先看以前的博客 spring cloud config整合gitlab搭建分布式的配置中心 spring cloud config分布式配置中心的高可用 今天,我们的重点是如何实现数据源的热部署. 1.在客户端配置数据源 @RefreshScope @Configuration// 配置数据源 public class DataSourceConfigure { @Bean @RefreshScope

  • 详解spring cloud config整合gitlab搭建分布式的配置中心

    在前面的博客中,我们都是将配置文件放在各自的服务中,但是这样做有一个缺点,一旦配置修改了,那么我们就必须停机,然后修改配置文件后再进行上线,服务少的话,这样做还无可厚非,但是如果是成百上千的服务了,这个时候,就需要用到分布式的配置管理了.而spring cloud config正是用来解决这个问题而生的.下面就结合gitlab来实现分布式配置中心的搭建.spring cloud config配置中心由server端和client端组成, 前提:在gitlab中的工程下新建一个配置文件config

随机推荐