详解Spring Cloud中Hystrix的请求合并

在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解Hystrix的请求合并。

Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是HystrixCollapser,OK,接下来我们就来看看如何实现Hystrix的请求合并。

服务提供者接口

我需在在服务提供者中提供两个接口供服务消费者调用,如下:

@RequestMapping("/getbook6")
public List<Book> book6(String ids) {
 System.out.println("ids>>>>>>>>>>>>>>>>>>>>>" + ids);
 ArrayList<Book> books = new ArrayList<>();
 books.add(new Book("《李自成》", 55, "姚雪垠", "人民文学出版社"));
 books.add(new Book("中国文学简史", 33, "林庚", "清华大学出版社"));
 books.add(new Book("文学改良刍议", 33, "胡适", "无"));
 books.add(new Book("ids", 22, "helloworld", "haha"));
 return books;
}

@RequestMapping("/getbook6/{id}")
public Book book61(@PathVariable Integer id) {
 Book book = new Book("《李自成》2", 55, "姚雪垠2", "人民文学出版社2");
 return book;
}

第一个接口是一个批处理接口,第二个接口是一个处理单个请求的接口。在批处理接口中,服务消费者传来的ids参数格式是1,2,3,4…这种格式,正常情况下我们需要根据ids查询到对应的数据,然后组装成一个集合返回,我这里为了处理方便,不管什么样的请求统统都返回一样的数据集;处理单个请求的接口就比较简单了,不再赘述。

服务消费者

OK,服务提供者处理好之后,接下来我们来看看服务消费者要怎么处理。

BookService

首先在BookService中添加两个方法用来调用服务提供者提供的接口,如下:

public Book test8(Long id) {
 return restTemplate.getForObject("http://HELLO-SERVICE/getbook6/{1}", Book.class, id);
}

public List<Book> test9(List<Long> ids) {
 System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName());
 Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ","));
 return Arrays.asList(books);
}

test8用来调用提供单个id的接口,test9用来调用批处理的接口,在test9中,我将test9执行时所处的线程打印出来,方便我们观察执行结果,另外,在RestTemplate中,如果返回值是一个集合,我们得先用一个数组接收,然后再转为集合(或许也有其他办法,小伙伴们有更好的建议可以提)。

BookBatchCommand

OK,BookService中的方法准备好了后,我们就可以来创建一个BookBatchCommand,这是一个批处理命令,如下:

public class BookBatchCommand extends HystrixCommand<List<Book>> {
 private List<Long> ids;
 private BookService bookService;

 public BookBatchCommand(List<Long> ids, BookService bookService) {
  super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup"))
    .andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey")));
  this.ids = ids;
  this.bookService = bookService;
 }

 @Override
 protected List<Book> run() throws Exception {
  return bookService.test9(ids);
 }
}

这个类实际上和我们在上篇博客中介绍的类差不多,都是继承自HystrixCommand,用来处理合并之后的请求,在run方法中调用BookService中的test9方法。

BookCollapseCommand

接下来我们需要创建BookCollapseCommand继承自HystrixCollapser来实现请求合并。如下:

public class BookCollapseCommand extends HystrixCollapser<List<Book>, Book, Long> {
 private BookService bookService;
 private Long id;

 public BookCollapseCommand(BookService bookService, Long id) {
  super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("bookCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
  this.bookService = bookService;
  this.id = id;
 }

 @Override
 public Long getRequestArgument() {
  return id;
 }

 @Override
 protected HystrixCommand<List<Book>> createCommand(Collection<CollapsedRequest<Book, Long>> collapsedRequests) {
  List<Long> ids = new ArrayList<>(collapsedRequests.size());
  ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
  BookBatchCommand bookBatchCommand = new BookBatchCommand(ids, bookService);
  return bookBatchCommand;
 }

 @Override
 protected void mapResponseToRequests(List<Book> batchResponse, Collection<CollapsedRequest<Book, Long>> collapsedRequests) {
  System.out.println("mapResponseToRequests");
  int count = 0;
  for (CollapsedRequest<Book, Long> collapsedRequest : collapsedRequests) {
   Book book = batchResponse.get(count++);
   collapsedRequest.setResponse(book);
  }
 }
}

关于这个类,我说如下几点:

1.首先在构造方法中,我们设置了请求时间窗为100ms,即请求时间间隔在100ms之内的请求会被合并为一个请求。

2.createCommand方法主要用来合并请求,在这里获取到各个单个请求的id,将这些单个的id放到一个集合中,然后再创建出一个BookBatchCommand对象,用该对象去发起一个批量请求。

3.mapResponseToRequests方法主要用来为每个请求设置请求结果。该方法的第一个参数batchResponse表示批处理请求的结果,第二个参数collapsedRequests则代表了每一个被合并的请求,然后我们通过遍历batchResponse来为collapsedRequests设置请求结果。

OK,所有的这些操作完成后,我们就可以来测试啦。

测试

我们在服务消费者端创建访问接口,来测试合并请求,测试接口如下:

@RequestMapping("/test7")
@ResponseBody
public void test7() throws ExecutionException, InterruptedException {
 HystrixRequestContext context = HystrixRequestContext.initializeContext();
 BookCollapseCommand bc1 = new BookCollapseCommand(bookService, 1l);
 BookCollapseCommand bc2 = new BookCollapseCommand(bookService, 2l);
 BookCollapseCommand bc3 = new BookCollapseCommand(bookService, 3l);
 BookCollapseCommand bc4 = new BookCollapseCommand(bookService, 4l);
 Future<Book> q1 = bc1.queue();
 Future<Book> q2 = bc2.queue();
 Future<Book> q3 = bc3.queue();
 Book book1 = q1.get();
 Book book2 = q2.get();
 Book book3 = q3.get();
 Thread.sleep(3000);
 Future<Book> q4 = bc4.queue();
 Book book4 = q4.get();
 System.out.println("book1>>>"+book1);
 System.out.println("book2>>>"+book2);
 System.out.println("book3>>>"+book3);
 System.out.println("book4>>>"+book4);
 context.close();
}

关于这个测试接口我说如下两点:

1.首先要初始化HystrixRequestContext

2.创建BookCollapseCommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理。
OK,我们来看看执行结果,如下:

通过注解实现请求合并

OK,上面这种请求合并方式写起来稍微有一点麻烦,我们可以使用注解来更优雅的实现这一功能。首先在BookService中添加两个方法,如下:

@HystrixCollapser(batchMethod = "test11",collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "100")})
public Future<Book> test10(Long id) {
 return null;
}

@HystrixCommand
public List<Book> test11(List<Long> ids) {
 System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName());
 Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ","));
 return Arrays.asList(books);
}

在test10方法上添加@HystrixCollapser注解实现请求合并,用batchMethod属性指明请求合并后的处理方法,collapserProperties属性指定其他属性。

OK,在BookService中写好之后,直接调用就可以了,如下:

@RequestMapping("/test8")
@ResponseBody
public void test8() throws ExecutionException, InterruptedException {
 HystrixRequestContext context = HystrixRequestContext.initializeContext();
 Future<Book> f1 = bookService.test10(1l);
 Future<Book> f2 = bookService.test10(2l);
 Future<Book> f3 = bookService.test10(3l);
 Book b1 = f1.get();
 Book b2 = f2.get();
 Book b3 = f3.get();
 Thread.sleep(3000);
 Future<Book> f4 = bookService.test10(4l);
 Book b4 = f4.get();
 System.out.println("b1>>>"+b1);
 System.out.println("b2>>>"+b2);
 System.out.println("b3>>>"+b3);
 System.out.println("b4>>>"+b4);
 context.close();
}

和前面的一样,前三个请求会进行合并,第四个请求会单独执行,OK,执行结果如下:

总结

请求合并的优点小伙伴们已经看到了,多个请求被合并为一个请求进行一次性处理,可以有效节省网络带宽和线程池资源,但是,有优点必然也有缺点,设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景。

Ok,我们的请求合并就说到这里,有问题欢迎小伙伴们留言讨论。希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解spring cloud hystrix 请求合并collapsing

    在HystrixCommand之前可以使用请求合并器(HystrixCollapser就是一个抽象的父类)来把多个请求合并成一个然后对后端依赖系统发起调用. 下图显示了两种情况下线程的数量和网络的连接数的情况:第一种是不使用合并器,第二种是使用请求合并器(假设所有的链接都是在一个短的时间窗口内并行的,比如10ms内). 为什么要使用请求合并? 使用请求合并来减少执行并发HystrixCommand执行所需的线程数和网络连接数,请求合并是自动执行的,不会强制开发人员手动协调批处理请求. 全局上下文

  • 详解Spring Cloud中Hystrix的请求合并

    在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解Hystrix的请求合并. Hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是HystrixCo

  • 详解Spring Cloud中Hystrix 线程隔离导致ThreadLocal数据丢失

    在Spring Cloud中我们用Hystrix来实现断路器,Zuul中默认是用信号量(Hystrix默认是线程)来进行隔离的,我们可以通过配置使用线程方式隔离. 在使用线程隔离的时候,有个问题是必须要解决的,那就是在某些业务场景下通过ThreadLocal来在线程里传递数据,用信号量是没问题的,从请求进来,但后续的流程都是通一个线程. 当隔离模式为线程时,Hystrix会将请求放入Hystrix的线程池中去执行,这个时候某个请求就有A线程变成B线程了,ThreadLocal必然消失了. 下面我

  • 详解spring cloud使用Hystrix实现单个方法的fallback

    本文介绍了spring cloud-使用Hystrix实现单个方法的fallback,分享给大家,具体如下: 一.加入Hystrix依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency> 二.编写Controller package c

  • 详解spring cloud中使用Ribbon实现客户端的软负载均衡

    开篇 本例是在springboot整合H2内存数据库,实现单元测试与数据库无关性和使用RestTemplate消费spring boot的Restful服务两个示例的基础上改造而来 在使用RestTemplate来消费spring boot的Restful服务示例中,我们提到,调用spring boot服务的时候,需要将服务的URL写死或者是写在配置文件中,但这两种方式,无论哪一种,一旦ip地址发生了变化,都需要改动程序,并重新部署服务,使用Ribbon的时候,可以有效的避免这个问题. 前言:

  • 详解Spring cloud使用Ribbon进行Restful请求

    写在前面 本文由markdown格式写成,为本人第一次这么写,排版可能会有点乱,还望各位海涵.  主要写的是使用Ribbon进行Restful请求,测试各个方法的使用,代码冗余较高,比较适合初学者,介意轻喷谢谢. 前提 一个可用的Eureka注册中心(文中以之前博客中双节点注册中心,不重要) 一个连接到这个注册中心的服务提供者 一个ribbon的消费者 注意:文中使用@GetMapping.@PostMapping.@PutMapping.@DeleteMapping等注解需要升级 spring

  • 详解Spring Cloud Feign 熔断配置的一些小坑

    1.在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,已解决. 2.使用feign默认配置,熔断不生效,已解决. 最近在做微服务的学习,发现在使用feign做服务调用时,使用继承的方式调用服务,加入Hystrix的熔断处理fallback配置时,会报错,代码如下: @RequestMapping("/demo/api") public interface HelloApi { @GetMapping("user/

  • 详解spring cloud config实现datasource的热部署

    关于spring cloud config的基本使用,前面的博客中已经说过了,如果不了解的话,请先看以前的博客 spring cloud config整合gitlab搭建分布式的配置中心 spring cloud config分布式配置中心的高可用 今天,我们的重点是如何实现数据源的热部署. 1.在客户端配置数据源 @RefreshScope @Configuration// 配置数据源 public class DataSourceConfigure { @Bean @RefreshScope

  • 详解spring cloud config整合gitlab搭建分布式的配置中心

    在前面的博客中,我们都是将配置文件放在各自的服务中,但是这样做有一个缺点,一旦配置修改了,那么我们就必须停机,然后修改配置文件后再进行上线,服务少的话,这样做还无可厚非,但是如果是成百上千的服务了,这个时候,就需要用到分布式的配置管理了.而spring cloud config正是用来解决这个问题而生的.下面就结合gitlab来实现分布式配置中心的搭建.spring cloud config配置中心由server端和client端组成, 前提:在gitlab中的工程下新建一个配置文件config

  • 详解spring cloud构建微服务架构的网关(API GateWay)

    前言 在我们前面的博客中讲到,当服务A需要调用服务B的时候,只需要从Eureka中获取B服务的注册实例,然后使用Feign来调用B的服务,使用Ribbon来实现负载均衡,但是,当我们同时向客户端暴漏多个服务的时候,客户端怎么调用我们暴漏的服务了,如果我们还想加入安全认证,权限控制,过滤器以及动态路由等特性了,那么就需要使用Zuul来实现API GateWay了,下面,我们先来看下Zuul怎么使用. 一.加入Zuul的依赖 <dependency> <groupId>org.spri

  • 详解Spring Cloud 跨服务数据聚合框架

    AG-Merge Spring Cloud 跨服务数据聚合框架 解决问题 解决Spring Cloud服务拆分后分页数据的属性或单个对象的属性拆分之痛, 支持对静态数据属性(数据字典).动态主键数据进行自动注入和转化, 其中聚合的静态数据会进行 一级混存 (guava). 举个栗子: 两个服务,A服务的某张表用到了B服务的某张表的值,我们在对A服务那张表查询的时候,把B服务某张表的值聚合在A服务的那次查询过程中 示例 具体示例代码可以看 ace-merge-demo 模块 |------- ac

随机推荐