Spring Boot 整合 Reactor实例详解

目录
  • 引言
  • 1 创建项目
  • 2 集成 H2 数据库
  • 3 创建测试类
    • 3.1 user 实体
    • 3.2 UserRepository
    • 3.3 UserService
    • 3.4 UserController
    • 3.5 SpringReactorApplication 添加注解支持
  • 测试
  • 总结

引言

Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

1 创建项目

使用 ;https://start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

然后导入 Reactor 包

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-test</artifactId>
    <scope>test</scope>
</dependency>

2 集成 H2 数据库

application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

server.port=8081
################ H2 数据库 基础配置 ##############
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.url=jdbc:h2:~/user
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database=h2
spring.jpa.hibernate.ddl-auto=update
spring.h2.console.path=/h2-console
spring.h2.console.enable=true

3 创建测试类

3.1 user 实体

建立简单数据操作实体 User。

import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.*;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:40
 */
@Data
@NoArgsConstructor
@Table(name = "t_user")
@Entity
public class User {
    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;
    private String userName;
    private int age;
    private String sex;
    public User(String userName, int age, String sex) {
        this.userName = userName;
        this.age = age;
        this.sex = sex;
    }
}

3.2 UserRepository

数据模型层使用 JPA 框架。

import com.prepared.user.domain.User;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Repository
public interface UserRepository extends JpaRepository<User, Long> {
}

3.3 UserService

service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

doOnError 监控异常情况;

doFinally 监控整体执行情况,如:耗时、调用量监控等。

import com.prepared.user.dao.UserRepository;
import com.prepared.user.domain.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.List;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:45
 */
@Service
public class UserService {
    private Logger logger = LoggerFactory.getLogger(UserService.class);
    @Resource
    private UserRepository userRepository;
    public Mono<Boolean> save(User user) {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.save(user) != null;
                })
                .doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error("save.user.error, user={}, e", user, e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
                });
    }
    public Mono<User> findById(Long id) {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.getReferenceById(id);
                }).doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error("findById.user.error, id={}, e", id, e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);
                });
    }
    public Mono<List<User>> list() {
        long startTime = System.currentTimeMillis();
        return Mono.fromSupplier(() -> {
                    return userRepository.findAll();
                }).doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error("list.user.error, e", e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
                });
    }
  public Flux<User> listFlux() {
        long startTime = System.currentTimeMillis();
        return Flux.fromIterable(userRepository.findAll())
                .doOnError(e -> {
                    // 打印异常日志&增加监控(自行处理)
                    logger.error("list.user.error, e", e);
                })
                .doFinally(e -> {
                    // 耗时 & 整体健康
                    logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
                });
    }
}

3.4 UserController

controller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

返回List可以使用Mono<List<User>> ,也可以使用 Flux<User>

  • Mono<T> 是一个特定的 Publisher<T>,最多可以发出一个元素
  • Flux<T> 是一个标准的 Publisher<T>,表示为发出 0 到 N 个元素的异步序列
import com.prepared.user.domain.User;
import com.prepared.user.service.UserService;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
 * @Author: prepared
 * @Date: 2022/8/29 21:47
 */
@RestController
public class UserController {
    @Resource
    private UserService userService;
    @RequestMapping("/add")
    public Mono<Boolean> add() {
        User user = new User("xiaoming", 10, "F");
        return userService.save(user) ;
    }
    @RequestMapping("/list")
    public Mono<List<User>> list() {
        return userService.list();
    }
}
    @RequestMapping("/listFlux")
    public Flux<User> listFlux() {
        return userService.listFlux();
    }

3.5 SpringReactorApplication 添加注解支持

Application 启动类添加注解 @EnableJpaRepositories

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
/**
 * Hello world!
 */
@SpringBootApplication
@EnableJpaRepositories
public class SpringReactorApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringReactorApplication.class, args);
    }
}

测试

启动项目,访问 localhost:8081/add,正常返回 true。

查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

后台日志:

2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181,

执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。

总结

响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

  • Future 的 get() 方法;
  • Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
  • 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度

以上就是Spring Boot 整合 Reactor实例详解的详细内容,更多关于Spring Boot 整合 Reactor的资料请关注我们其它相关文章!

(0)

相关推荐

  • Spring Boot Reactor 整合 Resilience4j详析

    目录 1 引入 pom 包 2 配置说明 2.1 限流 ratelimiter 2.2 重试 retry 2.3 超时 TimeLimiter 2.4 断路器 circuitbreaker 2.5 壁仓 bulkhead 2.5.1 SemaphoreBulkhead 2.5.2 FixedThreadPoolBulkhead 3 使用 3.1 配置 3.2 使用注解实现 1 引入 pom 包 <dependency> <groupId>io.github.resilience4j

  • C++中的Reactor原理与实现

    目录 一.Reactor介绍 二.代码实现 一.Reactor介绍 reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景.每种服务在服务端可能由多个方法组成.reactor会解耦并发请求的服务并分发给对应的事件处理器来处理. 中心思想是将所有要处理的I/o事件注册到一个中心I/o多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/o事件到来或是准备就绪(文件描述符或socket可读.写),多路复用器返回并将事

  • Reactor反应器的实现方法详解

    大多数应用都会使用ACE_Reactor::instance()提供的默认反应器实例.但是你也可以选择自己的反应器,这是因为ACE使用了Bridge模式(使用两个不同的类:一个是编程接口,另一个是实现,第一个类会把各个操作传给第二个类).例如使用线程池反应器实现:ACE_TP_Reactor* tp_reactor = new ACE_TP_Reactor;ACE_Reactor* my_reactor = new ACE_Reactor(tp_reactor, 1);//1表示my_react

  • Java中多线程Reactor模式的实现

    目录 1. 主服务器 2.IO请求handler+线程池 3.客户端 多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件. 让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求.这样一来连接请求与IO请求分开执行提高通道的并发量.同时

  • C#程序加密工具.Net Reactor详细教程

    .NET具有较多的优点,如:标准集成,简化应用,对移动设备的支持等.但使用.NET编写的程序有个致命的缺点:易被反编译,且运行时占用较大的资源. 那么我们就需要用到加密工具 dotNET Reactor 是一款强大的 .NET 代码保护和授权管理系统,安全可靠.简单易用,主要用来帮助开发人员保护他们的 .NET 软件产品.开发人员从此不必担心如何保护他们的知识产权,可以将更多精力放在产品功能的开发上.与代码混淆工具(Obfuscator)相比,.NET Reactor 可以完全阻止对 .NET

  • 用ASP开发网页需要牢记的注意事项

    步 骤 1.永远不要相信用户输入的内容具有适当的大小或者包含适当的字符.在使用其做出决策之前应该始终对用户输入进行验证.最佳的选择是创建一个 COM+ 组件,这样您可以从 ASP 页面中调用该组件来验证用户的输入内容.您也可以使用 Server.HTMLEncode 方法.Server.URLEncode 方法,或者本页底部代码示例中的某一个. 2.不要通过连接用户输入的字符串来创建 ASP 页中的数据库连接字符串.恶意用户可以通过在他们的输入内容中插入代码来获取数据库的访问权限.如果您使用的是

  • Java反应式框架Reactor中的Mono和Flux

    1. 前言 最近写关于响应式编程的东西有点多,很多同学反映对Flux和Mono这两个Reactor中的概念有点懵逼.但是目前Java响应式编程中我们对这两个对象的接触又最多,诸如Spring WebFlux.RSocket.R2DBC.我开始也对这两个对象头疼,所以今天我们就简单来探讨一下它们. 2. 响应流的特点 要搞清楚这两个概念,必须说一下响应流规范.它是响应式编程的基石.他具有以下特点: 响应流必须是无阻塞的.响应流必须是一个数据流.它必须可以异步执行.并且它也应该能够处理背压. 背压是

  • Project Reactor源码解析publishOn使用示例

    目录 功能分析 代码示例 prefetch delayError 源码分析 Flux#publishOn() Flux#subscribe() FluxPublishOn#subscribeOrReturn() FluxPublishOn#onSubscribe() 非融合 FluxPublishOn#onNext() FluxPublishOn#trySchedule() FluxPublishOn#run() FluxPublishOn#runAsync() FluxPublishOn#ch

  • Reactor中的onErrorContinue 和 onErrorResume

    目录 前言 1 基础功能 2 只有 onErrorResume () 3 只有 onErrorContinue() 4 onErrorResume() 然后 onErrorContinue() 5 使用 onErrorResume() 模拟 onErrorContinue() 6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue() 前言 这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorCont

  • Spring Boot 整合 Reactor实例详解

    目录 引言 1 创建项目 2 集成 H2 数据库 3 创建测试类 3.1 user 实体 3.2 UserRepository 3.3 UserService 3.4 UserController 3.5 SpringReactorApplication 添加注解支持 测试 总结 引言 Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式).它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Durat

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • Spring Boot整合Kafka教程详解

    目录 正文 步骤一:添加依赖项 步骤二:配置 Kafka 步骤三:创建一个生产者 步骤四:创建一个消费者 正文 本教程将介绍如何在 Spring Boot 应用程序中使用 Kafka.Kafka 是一个分布式的发布-订阅消息系统,它可以处理大量数据并提供高吞吐量. 在本教程中,我们将使用 Spring Boot 2.5.4 和 Kafka 2.8.0. 步骤一:添加依赖项 在 pom.xml 中添加以下依赖项: <dependency> <groupId>org.springfra

  • Spring boot 使用mysql实例详解

    Spring boot 使用mysql实例详解 开发阶段用 H2即可,上线时,通过以下配置切换到mysql,spring boot将使用这个配置覆盖默认的H2. 1.建立数据库: mysql -u root CREATE DATABASE springbootdb 2.pom.xml: <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId&g

  • spring boot整合CAS配置详解

    在下不才,以下是我花了好几天的时间才整合出来的在spring boot里面的CAS配置整合 为了帮助没搞定的人,毕竟自己踩了很多坑,一步一步爬过来的,有什么不足之处可以给建议  谢谢(小部分代码是整合他人的) 1.不多废话,直接上最重要的代码,以下代码整合cas的重要过程 import org.jasig.cas.client.authentication.AuthenticationFilter; import org.jasig.cas.client.session.SingleSignOu

  • 在Linux系统上安装Spring boot应用的教程详解

    Unix/Linux 服务 systemd 服务 操作过程 1. 安装了JDK的centOS7虚拟机 注意下载linux版本JDK的时候不能直接通过wget这种直接链接下载,否则会解压不成功,应该打开原官网,点击同意许可后点击下载(这种方式下载很慢),比较好的方式是复制下载页的地址到迅雷,通过迅雷打开该下载页,同意许可后点击下载. 下载后解压.配置环境变量 tar -zxvf jdk1.8.0_211.jar.gz 环境变量配置:/etc/profile 文件最后添加如下 export JAVA

  • Spring Boot Admin的使用详解(Actuator监控接口)

    第一部分 Spring Boot Admin 简介 Spring Boot Admin用来管理和监控Spring Boot应用程序. 应用程序向我们的Spring Boot Admin Client注册(通过HTTP)或使用SpringCloud®(例如Eureka,Consul)发现. UI是Spring Boot Actuator端点上的Vue.js应用程序. Spring Boot Admin 是一个管理和监控Spring Boot 应用程序的开源软件.每个应用都认为是一个客户端,通过HT

  • Java Spring Boot消息服务万字详解分析

    目录 消息服务概述 为什么要使用消息服务 异步处理 应用解耦 流量削峰 分布式事务管理 常用消息中间件介绍 ActiveMQ RabbitMQ RocketMQ RabbitMQ消息中间件 RabbitMQ简介 RabbitMQ工作模式介绍 Work queues(工作队列模式) Public/Subscribe(发布订阅模式) Routing(路由模式) Topics(通配符模式) RPC Headers RabbitMQ安装以及整合环境搭建 安装RabbitMQ 下载RabbitMQ 安装R

  • Spring MVC的文件下载实例详解

    Spring MVC的文件下载实例详解 读取文件 要下载文件,首先是将文件内容读取进来,使用字节数组存储起来,这里使用spring里面的工具类实现 import org.springframework.util.FileCopyUtils; public byte[] downloadFile(String fileName) { byte[] res = new byte[0]; try { File file = new File(BACKUP_FILE_PATH, fileName); i

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

随机推荐