Springboot实现高吞吐量异步处理详解(适用于高并发场景)

技术要点

org.springframework.web.context.request.async.DeferredResult<T>

示例如下:

1.   新建Maven项目  async

2.   pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
    http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>
  <groupId>com.java</groupId>
  <artifactId>async</artifactId>
  <version>1.0.0</version>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.5.RELEASE</version>
  </parent>

  <dependencies>

    <!-- Spring Boot -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- 热部署 -->
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>springloaded</artifactId>
      <version>1.2.8.RELEASE</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>provided</scope>
    </dependency>

  </dependencies>

  <build>
    <finalName>${project.artifactId}</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
          <encoding>UTF-8</encoding>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>repackage</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

3.   AsyncStarter.java

package com.java;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class AsyncStarter {

  public static void main(String[] args) {
    SpringApplication.run(AsyncStarter.class, args);
  }
}

4.   AsyncVo.java

package com.java.vo;

import org.springframework.web.context.request.async.DeferredResult;

/**
 * 存储异步处理信息
 *
 * @author Logen
 *
 * @param <I> 接口输入参数
 * @param <O> 接口返回参数
 */
public class AsyncVo<I, O> {

  /**
   * 请求参数
   */
  private I params;

  /**
   * 响应结果
   */
  private DeferredResult<O> result;

  public I getParams() {
    return params;
  }

  public void setParams(I params) {
    this.params = params;
  }

  public DeferredResult<O> getResult() {
    return result;
  }

  public void setResult(DeferredResult<O> result) {
    this.result = result;
  }
}

5.   RequestQueue.java

package com.java.queue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import org.springframework.stereotype.Component;

import com.java.vo.AsyncVo;

/**
 * 存放所有异步处理接口请求队列的对象,一个接口对应一个队列
 *
 * @author Logen
 *
 */
@Component
public class RequestQueue {

  /**
   * 处理下订单接口的队列,设置缓冲容量为50
   */
  private BlockingQueue<AsyncVo<String, Object>> orderQueue = new LinkedBlockingQueue<>(50);

  public BlockingQueue<AsyncVo<String, Object>> getOrderQueue() {
    return orderQueue;
  }
}

6.   OrderTask.java

package com.java.task;

import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
 * 处理订单接口的任务,每个任务类处理一种接口
 *
 * @author Logen
 *
 */
@Component
public class OrderTask extends Thread {

  @Autowired
  private RequestQueue queue;

  private boolean running = true;

  @Override
  public void run() {
    while (running) {
      try {
        AsyncVo<String, Object> vo = queue.getOrderQueue().take();
        System.out.println("[ OrderTask ]开始处理订单");

        String params = vo.getParams();
        Thread.sleep(3000);
        Map<String, Object> map = new HashMap<>();
        map.put("params", params);
        map.put("time", System.currentTimeMillis());

        vo.getResult().setResult(map);

        System.out.println("[ OrderTask ]订单处理完成");
      } catch (InterruptedException e) {
        e.printStackTrace();
        running = false;
      }

    }
  }

  public void setRunning(boolean running) {
    this.running = running;
  }
}

7.   QueueListener.java

package com.java.listener;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.java.task.OrderTask;

/**
 * 队列监听器,初始化启动所有监听任务
 *
 * @author Logen
 *
 */
@Component
public class QueueListener {

  @Autowired
  private OrderTask orderTask;

  /**
   * 初始化时启动监听请求队列
   */
  @PostConstruct
  public void init() {
    orderTask.start();
  }

  /**
   * 销毁容器时停止监听任务
   */
  @PreDestroy
  public void destory() {
    orderTask.setRunning(false);
  }

}

8.   OrderController.java

package com.java.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import com.java.queue.RequestQueue;
import com.java.vo.AsyncVo;

/**
 * <blockquote>
 *
 * <pre>
 *
 * 模拟下单处理,实现高吞吐量异步处理请求
 *
 * 1、 Controller层接口只接收请求,不进行处理,而是把请求信息放入到对应该接口的请求队列中
 * 2、 该接口对应的任务类监听对应接口的请求队列,从队列中顺序取出请求信息并进行处理
 *
 * 优点:接口几乎在收到请求的同时就已经返回,处理程序在后台异步进行处理,大大提高吞吐量
 *
 *
 * </pre>
 *
 * </blockquote>
 *
 * @author Logen
 *
 */
@RestController
public class OrderController {

  @Autowired
  private RequestQueue queue;

  @GetMapping("/order")
  public DeferredResult<Object> order(String number) throws InterruptedException {
    System.out.println("[ OrderController ] 接到下单请求");
    System.out.println("当前待处理订单数: " + queue.getOrderQueue().size());

    AsyncVo<String, Object> vo = new AsyncVo<>();
    DeferredResult<Object> result = new DeferredResult<>();

    vo.setParams(number);
    vo.setResult(result);

    queue.getOrderQueue().put(vo);
    System.out.println("[ OrderController ] 返回下单结果");
    return result;
  }
}

9.   运行 AsyncStarter.java ,启动测试

浏览器输入 http://localhost:8080/order?number=10001

正常情况处理3秒返回,返回结果如下

{"time":1548241500718,"params":"10001"}

观察控制台打印日志,如下所示:

[ OrderController ] 接到下单请求
当前待处理订单数: 0
[ OrderController ] 返回下单结果
[ OrderTask ]开始处理订单
[ OrderTask ]订单处理完成

结论:Controller层几乎在接收到请求的同时就已经返回,处理程序在后台异步处理任务。

快速多次刷新浏览器,目的为了高并发测试,观察控制台打印信息

现象:Controller层快速返回,待处理请求在队列中开始增加,异步处理程序在按顺序处理请求。

优点:对客户端响应时间不变,但提高了服务端的吞吐量。大大提升高并发处理性能!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • springboot高并发下提高吞吐量的实现

    公司让做一个全文检索的项目,我使用的是elasticsearch.但是对性能有很高的要求,为了解决性能问题,我简直是寝食难安. es(elasticsearch)没有使用分布式,单台的. 开发完测试的时候,查询慢,吞吐量低. 网友们建议用异步--使用Callable来实现.webAsyncTask.Deferred方式等,我一一尝试了之后也没有明显效果,使用压测工具发现使用前后没有一点提升. 尝试这些方法花费了我两天的时间! 在不想使用redis缓存的情况下,我想到了多线程抱着试一试的心态. 没

  • Springboot实现高吞吐量异步处理详解(适用于高并发场景)

    技术要点 org.springframework.web.context.request.async.DeferredResult<T> 示例如下: 1.   新建Maven项目  async 2.   pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaL

  • springboot+rabbitmq实现智能家居实例详解

    目录 引言 一.什么是 MQTT协议? 二.为什么要用 MQTT协议? 三.MQTT协议介绍 MQTT数据包 1.固定头 2.可变头 3.消息体payload 消息质量(QoS ) 1.Qos 0 2.Qos 1 3.Qos 2 LWT(最后遗嘱) 四.MQTT协议应用场景 五.代码实现 1.启用 rabbitmq的mqtt协议 2.mqtt 客户端依赖包 3.消息发送者 4.消息订阅 六.测试消息 1.测试消息发送 2.测试消息订阅 七.应用注意事项 clientId 要唯一 八.其他中间件

  • log4j2异步Logger(详解)

    1 异步Logger的意义 之前的日志框架基本都实现了AsyncAppender,被证明对性能的提升作用非常明显. 在log4j2日志框架中,增加了对Logger的异步实现.那么这一步的解耦,意义何在呢? 如图,按我目前的理解:异步Logger是让业务逻辑把日志信息放入Disruptor队列后可以直接返回(无需等待"挂载的各个Appender"都取走数据) 优点:更高吞吐.调用log方法更低的延迟. 缺点:异常处理麻烦. 可变日志消息问题.更大的CPU开销.需要等待"最慢的A

  • java异步编程详解

    很多时候我们都希望能够最大的利用资源,比如在进行IO操作的时候尽可能的避免同步阻塞的等待,因为这会浪费CPU的资源.如果在有可读的数据的时候能够通知程序执行读操作甚至由操作系统内核帮助我们完成数据的拷贝,这再好不过了.从NIO到CompletableFuture.Lambda.Fork/Join,java一直在努力让程序尽可能变的异步甚至拥有更高的并行度,这一点一些函数式语言做的比较好,因此java也或多或少的借鉴了某些特性.下面介绍一种非常常用的实现异步操作的方式. 考虑有一个耗时的操作,操作

  • SpringBoot实战之处理异常案例详解

    前段时间写了一篇关于实现统一响应信息的博文,根据文中实战操作,能够解决正常响应的一致性,但想要实现优雅响应,还需要优雅的处理异常响应,所以有了这篇内容. 作为后台服务,能够正确的处理程序抛出的异常,并返回友好的异常信息是非常重要的,毕竟我们大部分代码都是为了 处理异常情况.而且,统一的异常响应,有助于客户端理解服务端响应,并作出正确处理,而且能够提升接口的服务质量. SpringBoot提供了异常的响应,可以通过/error请求查看效果: 这是从浏览器打开的场景,也就是请求头不包括content

  • Springboot整合Netty实现RPC服务器详解流程

    目录 一.什么是RPC? 二.实现RPC需要解决那些问题? 1. 约定通信协议格式 RPC请求 RPC响应 2. 序列化方式 3. TCP粘包.拆包 4. 网络通信框架的选择 三.RPC服务端 四.RPC客户端 总结 一.什么是RPC? RPC(Remote Procedure Call)远程过程调用,是一种进程间的通信方式,其可以做到像调用本地方法那样调用位于远程的计算机的服务.其实现的原理过程如下: 本地的进程通过接口进行本地方法调用. RPC客户端将调用的接口名.接口方法.方法参数等信息利

  • springboot中redis正确的使用详解

    redis实现了对数据的缓存,在项目里一些字典数据,会话数据,临时性数据都会向redis来存储,而在springboot里对redis也有支持,一般来说多个线程共同使用一个redis实现是有线程安全的风险的,而每个实现一个线程又太浪费资源,无法控制线程数量是非常危险的,所以就出现了一些redis线程池组件,下面说一下两个主要的组件. jedis 线程池主要是每个实例有自己的线程,线程可以从它建立的池子里获取lettuce lettuce是 apache推出的线程池工具,它的redis实例是可以被

  • SpringBoot实现自定义事件的方法详解

    目录 简介 步骤1:自定义事件 步骤2:自定义监听器 方案1:ApplicationListener 方案2:SmartApplicationListener 步骤3:注册监听器 法1:@Component(适用于所有监听器) 法2:application.yml中添加配置 法3:启动类中注册 步骤4:发布事件 法1:注入ApplicationContext,调用其publishEvent方法 法2:启动类中发布 简介 说明 本文用实例来介绍如何在SpringBoot中自定义事件来使用观察者模式

  • SpringBoot动态定时功能实现方案详解

    目录 业务场景 环境准备 实现方案 归纳总结 业务场景 基于上篇程序,做了一版动态定时程序,然后发现这个定时程序需要在下次执行的时候会加载新的时间,所以如果改了定时程序不能马上触发,所以想到一种方法,在保存定时程序的时候将cron表达式传过去,然后触发定时程序,下面看看怎么实现 环境准备 开发环境 JDK 1.8 SpringBoot2.2.1 Maven 3.2+ 开发工具 IntelliJ IDEA smartGit Navicat15 实现方案 基于上一版进行改进: 先根据选择的星期生成c

  • C++ 线程(串行 并行 同步 异步)详解

    C++  线程(串行 并行 同步 异步)详解 看了很多关于这类的文章,一直没有总结.不总结的话就会一直糊里糊涂,以下描述都是自己理解的非官方语言,不一定严谨,可当作参考. 首先,进程可理解成一个可执行文件的执行过程.在ios app上的话我们可以理解为我们的app的.ipa文件执行过程也即app运行过程.杀掉app进程就杀掉了这个app在系统里运行所占的内存. 线程:线程是进程的最小单位.一个进程里至少有一个主线程.就是那个main thread.非常简单的app可能只需要一个主线程即UI线程.

随机推荐