Reactor中的onErrorContinue 和 onErrorResume

目录
  • 前言
  • 1 基础功能
  • 2 只有 onErrorResume ()
  • 3 只有 onErrorContinue()
  • 4 onErrorResume() 然后 onErrorContinue()
  • 5 使用 onErrorResume() 模拟 onErrorContinue()
  • 6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()

前言

这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorContinue 时,onErrorResume 会在它旁边弹出。让我把我的测试代码和我的一些解释粘贴在下面。

1 基础功能

这是一个简单的函数,将 5 个连续的数字分别乘以 2,然后相加,当 i==2 时抛出一个异常:

public static void main(String... args) {
      Flux.range(1,5)
              .doOnNext(i -> System.out.println("input=" + i))
              .map(i -> i == 2 ? i / 0 : i)
              .map(i -> i * 2)
              .reduce((i,j) -> i+j)
              .doOnNext(i -> System.out.println("sum=" + i))
              .block();
  }

显然,输出如下:

input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero

2 只有 onErrorResume ()

public static void main(String... args) {
      Flux.range(1,5)
              .doOnNext(i -> System.out.println("input=" + i))
              .map(i -> i == 2 ? i / 0 : i)
              .map(i -> i * 2)
              .onErrorResume(err -> {
                  log.info("onErrorResume");
                  return Flux.empty();
              })
              .reduce((i,j) -> i+j)
              .doOnNext(i -> System.out.println("sum=" + i))
              .block();
  }

输出如下:

input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2

正如官方文档描述(onErrorResume)的那样,onErrorResume 用返回内容替换 Flux,因此在 2 之后的数据不会被处理。唯一值得一提的是,onErrorResume() 不必马上在错误之后捕获异常。在 onErrorContinue() 的官网文档中(onErrorContinue),只有 onErrorContinue() 强调了了 upstream 关键词,但显然,它还有其他含义。

3 只有 onErrorContinue()

public static void main(String... args) {
  Flux.range(1,5)
          .doOnNext(i -> System.out.println("input=" + i))
          .map(i -> i == 2 ? i / 0 : i)
          .map(i -> i * 2)
          .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
          .reduce((i,j) -> i+j)
          .doOnNext(i -> System.out.println("sum=" + i))
          .block();
}

输出:

input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

显然,onErrorContinue 会丢弃错误数据 2, 然后继续数字 3 直到 5。

4 onErrorResume() 然后 onErrorContinue()

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .map(i -> i == 2 ? i / 0 : i)
            .map(i -> i * 2)
            .onErrorResume(err -> {
                log.info("onErrorResume");
                return Flux.empty();
            })
            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

输出和上面一样:

input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

这样的结果,你想到了吗?onErrorContinue() 会在 onErrorResume() 得到错误之前处理这个错误。当两个错误处理函数在同一个函数中的时候很明显,但是当你的函数中只有 onErrorResume(),而一些调用者实际上有 onErrorContinue() 时,你的 onErrorResume() 没有被调用的原因可能就不那么明显了。

5 使用 onErrorResume() 模拟 onErrorContinue()

有些博客建议我们完全不用 onErrorContinue(),且在所有场景中仅用 onErrorResume()。但是上述示例已经展示了它们会产生不同的结果。那我们怎么实现呢?

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .flatMap(i -> Mono.just(i)
                    .map(j -> j == 2 ? j / 0 : j)
                    .map(j -> j * 2)
                    .onErrorResume(err -> {
                        System.out.println("onErrorResume");
                        return Mono.empty();
                    })
            )
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

因此,本质上是将可能在 flatMap 或 concatMap 中抛出错误的操作包装起来,并在其上使用 onErrorResume()。这样,它会产生相同的结果:

input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26

6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()

有时候,onErrorContinue() 放在调用程序中,您无法控制它。但你仍然需要 onErrorResume()。你该怎么办?

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .flatMap(i -> Mono.just(i)
                    .map(j -> j == 2 ? j / 0 : j)
                    .map(j -> j * 2)
                    .onErrorResume(err -> {
                        System.out.println("onErrorResume");
                        return Mono.empty();
                    })
                    .onErrorStop()
            )
            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

秘诀是在 onErrorResume() 代码块的末尾添加 onErrorStop() ——这会阻塞 onErrorContinue(),这样它就不会在 onErrorResume() 之前占用错误。尝试删除 onErrorStop(),你会看到 onErrorContinue() 在 onErrorResume 之前弹出。

到此这篇关于Reactor中的onErrorContinue 和 onErrorResume的文章就介绍到这了,更多相关Reactor onErrorContinue 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 如何使用Reactor完成类似Flink的操作

    一.背景 Flink在处理流式任务的时候有很大的优势,其中windows等操作符可以很方便的完成聚合任务,但是Flink是一套独立的服务,业务流程中如果想使用需要将数据发到kafka,用Flink处理完再发到kafka,然后再做业务处理,流程很繁琐. 比如在业务代码中想要实现类似Flink的window按时间批量聚合功能,如果纯手动写代码比较繁琐,使用Flink又太重,这种场景下使用响应式编程RxJava.Reactor等的window.buffer操作符可以很方便的实现. 响应式编程框架也早已

  • Java中多线程Reactor模式的实现

    目录 1. 主服务器 2.IO请求handler+线程池 3.客户端 多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件. 让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求.这样一来连接请求与IO请求分开执行提高通道的并发量.同时

  • Reactor反应器的实现方法详解

    大多数应用都会使用ACE_Reactor::instance()提供的默认反应器实例.但是你也可以选择自己的反应器,这是因为ACE使用了Bridge模式(使用两个不同的类:一个是编程接口,另一个是实现,第一个类会把各个操作传给第二个类).例如使用线程池反应器实现:ACE_TP_Reactor* tp_reactor = new ACE_TP_Reactor;ACE_Reactor* my_reactor = new ACE_Reactor(tp_reactor, 1);//1表示my_react

  • Project Reactor源码解析publishOn使用示例

    目录 功能分析 代码示例 prefetch delayError 源码分析 Flux#publishOn() Flux#subscribe() FluxPublishOn#subscribeOrReturn() FluxPublishOn#onSubscribe() 非融合 FluxPublishOn#onNext() FluxPublishOn#trySchedule() FluxPublishOn#run() FluxPublishOn#runAsync() FluxPublishOn#ch

  • Reactor 多任务并发执行且结果按顺序返回第一个

    目录 1 场景 2 创建 service 2.1 创建基本接口和实体类 2.2 创建 service 实现 3 主体方法 4 实现异步 4.1 subcribeOn 实现异步 4.2 CompletableFuture 实现异步 1 场景 调用多个平级服务,按照服务优先级返回第一个有效数据. 具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级.每次只需要返回第一个有数据的弹窗.但是又希望所有弹窗之间的数据获取是异步的.这种场景使用 Reactor 怎么实现呢? 2 创建 service

  • Reactor中的onErrorContinue 和 onErrorResume

    目录 前言 1 基础功能 2 只有 onErrorResume () 3 只有 onErrorContinue() 4 onErrorResume() 然后 onErrorContinue() 5 使用 onErrorResume() 模拟 onErrorContinue() 6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue() 前言 这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorCont

  • Java反应式框架Reactor中的Mono和Flux

    1. 前言 最近写关于响应式编程的东西有点多,很多同学反映对Flux和Mono这两个Reactor中的概念有点懵逼.但是目前Java响应式编程中我们对这两个对象的接触又最多,诸如Spring WebFlux.RSocket.R2DBC.我开始也对这两个对象头疼,所以今天我们就简单来探讨一下它们. 2. 响应流的特点 要搞清楚这两个概念,必须说一下响应流规范.它是响应式编程的基石.他具有以下特点: 响应流必须是无阻塞的.响应流必须是一个数据流.它必须可以异步执行.并且它也应该能够处理背压. 背压是

  • 详解Reactor中Context的用法

    目录 一.使用介绍 二.源码解读 三.如何桥接现有的ThreadLocal系统 四.总结 在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为原生的,在处理一个数据流Flux/Mono时,基本无法知道是运行在哪个线程上或哪个线程池里,可以说,每一个操作符operator以及内部的函数都可能运行在不同的线程上.这就意味着,以前用ThreadLocal来作为方法间透明传递共享变量的方式不再行得通.为此,Reactor提供了Context来替代ThreadLocal实现一个跨线程的共享变

  • epoll封装reactor原理剖析示例详解

    目录 reactor是什么? reactor模型三个重要组件与流程分析 组件 流程 将epoll封装成reactor事件驱动 封装每一个连接sockfd变成ntyevent 封装epfd和ntyevent变成ntyreactor 封装读.写.接收连接等事件对应的操作变成callback 给每个客户端的ntyevent设置属性 将ntyevent加入到epoll中由内核监听 将ntyevent从epoll中去除 读事件回调函数 写事件回调函数 接受新连接事件回调函数 reactor运行 react

  • Spring Boot 整合 Reactor实例详解

    目录 引言 1 创建项目 2 集成 H2 数据库 3 创建测试类 3.1 user 实体 3.2 UserRepository 3.3 UserService 3.4 UserController 3.5 SpringReactorApplication 添加注解支持 测试 总结 引言 Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式).它直接整合 Java8 的函数式 API,尤其是 CompletableFuture, Stream,还有 Durat

  • SpringBoot之webflux全面解析

    目录 webflux介绍 webflux应用场景 SpringBoot2.0WebFlux 响应式编程 SpringWebflux springwebflux和springmvc的异同点 Nettyselector模型 Reactor指南 Java原有的异步编程方式 Reactor线程模型 webflux实践 webflux解析 总结 webflux介绍 Spring Boot 2.0 spring.io 官网有句醒目的话是: BUILD ANYTHING WITH SPRING BOOT Sp

  • Reactive反应式编程及使用介绍

    目录 前言 反应式编程简介 阻塞可能会浪费资源 使用异步来解决? 回调地狱的例子 与回调代码等效的Reactor代码示例 具有超时和回退的Reactor代码示例 CompletableFuture组合的例子 与未来代码等效的Reactor代码示例 从命令式到反应式编程 可组合性和可读性 类比装配线工作流程 操作符(运算符) 在你订阅之前什么都不会发生 背压 热与冷 前言 前一篇分析了Spring WebFlux的设计及实现原理后,反应式编程又来了,Spring WebFlux其底层还是基于Rea

  • 简单了解spring cloud 网关服务

    微服务 网关服务 网关服务是微服务体系里面重要的一环. 微服务体系内,各个服务之间都会有通用的功能比如说:鉴权.安全.监控.日志.服务调度转发.这些都是可以单独抽象出来做一个服务来处理.所以微服务网关应运而生.其主要作用作为微服务体系里面流量的唯一入口去做一些功能的实现. 微服务的网关担当的主要职责可以分为俩种 主要业务功能抽取,鉴权.安全.服务调度.限流.熔断等 非主要的业务功能抽取,监控.日志.缓存.黑白名单.埋点等 Spring Cloud 网关服务 现在市面主要流行的俩种 Netflix

随机推荐