Reactive反应式编程及使用介绍

目录
  • 前言
  • 反应式编程简介
  • 阻塞可能会浪费资源
  • 使用异步来解决?
    • 回调地狱的例子
    • 与回调代码等效的Reactor代码示例
    • 具有超时和回退的Reactor代码示例
    • CompletableFuture组合的例子
    • 与未来代码等效的Reactor代码示例
  • 从命令式到反应式编程
    • 可组合性和可读性
    • 类比装配线工作流程
      • 操作符(运算符)
    • 在你订阅之前什么都不会发生
    • 背压
    • 热与冷

前言

前一篇分析了Spring WebFlux的设计及实现原理后,反应式编程又来了,Spring WebFlux其底层还是基于Reactive编程模型的,在java领域中,关于Reactive,有一个框架规范,叫【Reactive Streams】,在java9的ava.util.concurrent.Flow包中已经实现了这个规范。其他的优秀实现还有Reactor和Rxjava。在Spring WebFlux中依赖的就是Reactor。虽然你可能没用过Reactive开发过应用,但是或多会少你接触过异步Servlet,同时又有这么一种论调:异步化非阻塞io并不能增强太多的系统性能,但是也不可否认异步化后并发性能上去了。听到这种结论后在面对是否选择Reactive编程后,是不是非常模棱两可。因为我们不是很了解反应式编程,所以会有这种感觉。没关系,下面看看反应式编程集大者Reactor是怎么阐述反应式编程的。

反应式编程简介

Reactor是Reactive Programming范例的一个实现,可以概括为:
反应式编程是一种涉及数据流和变化传播的异步编程范例。这意味着可以通过所采用的编程语言轻松地表达静态(例如阵列)或动态(例如事件发射器)数据流。
作为反应式编程方向的第一步,Microsoft在.NET生态系统中创建了Reactive Extensions(Rx)库。然后RxJava在JVM上实现了响应式编程。随着时间的推移,通过Reactive Streams工作出现了Java的标准化 ,这一规范定义了JVM上的反应库的一组接口和交互规则。它的接口已经集成到父Flow类下的Java 9中。

反应式编程范例通常以面向对象的语言呈现,作为Observer设计模式的扩展。人们还可以将主要的反应流模式与熟悉的迭代器设计模式进行比较,因为在所有这些库中对Iterable- Iterator对存在双重性 。一个主要的区别是,虽然迭代器是基于拉的,但是反应流是基于推的。

使用迭代器是一种命令式编程模式,即使访问值的方法完全由其负责Iterable。实际上,开发人员可以选择何时访问next()序列中的项目。在反应流中,相当于上述对Publisher-Subscriber。但是, 当它们出现时,Publisher它会通知订阅者新的可用值,而这一推动方面是被动反应的关键。此外,应用于推送值的操作以声明方式而非命令方式表示:程序员表达计算的逻辑而不是描述其精确的控制流。

除了推送值之外,还以明确定义的方式涵盖错误处理和完成方面。A Publisher可以将新值推送到Subscriber(通过调用onNext),但也可以发出错误(通过调用onError)或完成(通过调用onComplete)。错误和完成都会终止序列。这可以概括为:

onNext x 0..N [onError | onComplete]

这种方法非常灵活。该模式支持没有值,一个值或n值的用例(包括无限的值序列,例如时钟的连续滴答)。

但是我们首先考虑一下,为什么我们首先需要这样的异步反应库?

阻塞可能会浪费资源

现代应用程序可以覆盖大量并发用户,即使现代硬件的功能不断提高,现代软件的性能仍然是一个关键问题。
人们可以通过两种方式来提高计划的绩效:

  • 并行化:使用更多线程和更多硬件资源。
  • 在现有资源的使用方式上寻求更高的效率。

通常,Java开发人员使用阻塞代码编写程序。这种做法很好,直到出现性能瓶颈,此时需要引入额外的线程,运行类似的阻塞代码。但是,资源利用率的这种扩展会很快引入争用和并发问题。

更糟糕的是,阻止浪费资源。如果仔细观察,一旦程序涉及一些延迟(特别是I / O,例如数据库请求或网络调用),资源就会被浪费,因为线程(或许多线程)现在处于空闲状态,等待数据。

所以并行化方法不是灵丹妙药。为了获得硬件的全部功能是必要的,但是理由也很复杂并且易受资源浪费的影响。

使用异步来解决?

第二种方法(前面提到过),寻求更高的效率,可以解决资源浪费问题。通过编写异步,非阻塞代码,您可以使用相同的底层资源将执行切换到另一个活动任务,然后在异步处理完成后返回到当前进程。

但是如何在JVM上生成异步代码?Java提供了两种异步编程模型:

回调:异步方法没有返回值,但需要额外的 callback参数(lambda或匿名类),在结果可用时调用它们。一个众所周知的例子是Swing的EventListener层次结构。

期货:异步方法Future立即返回。异步进程计算一个T值,但该Future对象包含对它的访问。该值不会立即可用,并且可以轮询对象,直到该值可用。例如,ExecutorService运行Callable任务使用Future对象。

这些技术是否足够好?不适用于所有用例,两种方法都有局限性。

回调难以组合在一起,很快导致难以阅读和维护的代码(称为“Callback Hell”)。

考虑一个示例:在用户界面上显示用户的前五个收藏夹,或者如果她没有收藏夹则提出建议。这通过三个服务(一个提供喜欢的ID,第二个提取喜欢的详细信息,第三个提供详细建议):

回调地狱的例子

userService.getFavorites(userId, new Callback() {
  public void onSuccess(Listlist) {
    if (list.isEmpty()) {
      suggestionService.getSuggestions(new Callback() {
        public void onSuccess(Listlist) {
          UiUtils.submitOnUiThread(() -> {
            list.stream()
                .limit(5)
                .forEach(uiList::show);
            });
        }
        public void onError(Throwable error) {
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream()
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId,
            new Callback() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }
              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }
  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
  • 我们有基于回调的服务:一个Callback接口,其中包含在异步过程成功时调用的方法,以及在发生错误时调用的方法。
  • 第一个服务使用喜欢的ID列表调用其回调。
  • 如果列表为空,我们必须去suggestionService。
  • 在suggestionService给出了一个List到第二个回调。
  • 由于我们处理UI,我们需要确保我们的消费代码将在UI线程中运行。
  • 我们使用Java 8 Stream将处理的建议数限制为五个,并在UI中的图形列表中显示它们。
  • 在每个级别,我们以相同的方式处理错误:在弹出窗口中显示它们。
  • 回到最喜欢的ID级别。如果服务返回完整列表,那么我们需要转到favoriteService获取详细Favorite对象。由于我们只需要五个,我们首先流式传输ID列表,将其限制为五个。
  • 再一次,一个回调。这次我们得到一个完全成熟的Favorite对象,我们将其推送到UI线程内的UI。

这是很多代码,它有点难以遵循并且具有重复的部分。考虑它在Reactor中的等价物:

与回调代码等效的Reactor代码示例

userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  • 我们从最喜欢的ID流开始。
  • 我们将它们异步转换为详细的Favorite对象(flatMap)。我们现在有一个流动Favorite。
  • 如果流量Favorite是空的,我们会切换到后退 suggestionService。
  • 我们最多只对最终流程中的五个元素感兴趣。
  • 最后,我们想要处理UI线程中的每个数据。
  • 我们通过描述如何处理数据的最终形式(在UI列表中显示)以及在出现错误(显示弹出窗口)时该怎么做来触发流程。

如果您想确保在不到800毫秒内检索到喜欢的ID,或者如果需要更长时间从缓存中获取它们,该怎么办?在基于回调的代码中,这是一项复杂的任务。在Reactor中,它变得像timeout在链中添加运算符一样简单:

具有超时和回退的Reactor代码示例

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800))
           .onErrorResume(cacheService.cachedFavoritesFor(userId))
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  • 如果上面的部分发出的时间超过800毫秒,则传播错误。
  • 如果出现错误,请回复cacheService。
  • 链的其余部分与前面的示例类似。

尽管Java 8中带来了改进,但期货比回调要好一些,但它们在构图方面仍然表现不佳CompletableFuture。一起编排多个未来是可行但不容易的。此外,Future还有其他问题:Future通过调用get() 方法很容易结束对象的另一个阻塞情况,它们不支持延迟计算,并且它们不支持多个值和高级错误处理。

考虑另一个例子:我们得到一个ID列表,我们要从中获取一个名称和一个统计信息,然后将它们成对地组合在一起,所有这些都是异步的。

CompletableFuture组合的例子

CompletableFutureids = ifhIds();
CompletableFutureresult = ids.thenComposeAsync(l -> {
	Streamzip =
			l.stream().map(i -> {
				CompletableFuturenameTask = ifhName(i);
				CompletableFuturestatTask = ifhStat(i);
				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
			});
	ListcombinationList = zip.collect(Collectors.toList());
	CompletableFuture[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);
	CompletableFutureallDone = CompletableFuture.allOf(combinationArray);
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join)
			.collect(Collectors.toList()));
});
Listresults = result.join();
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
  • 我们从一个未来开始,它为我们提供了一个id要处理的值列表。
  • 一旦得到列表,我们想要开始一些更深入的异步处理。
  • 对于列表中的每个元素:
  • 异步获取关联的名称。
  • 异步获取相关任务。
  • 结合两个结果。
  • 我们现在有一个代表所有组合任务的期货清单。为了执行这些任务,我们需要将列表转换为数组。
  • 将数组传递给CompletableFuture.allOf,输出Future完成所有任务后完成的数组。
  • 棘手的一点是allOf返回CompletableFuture,所以我们重申了期货清单,通过收集结果join() (这里没有阻止,因为allOf确保期货全部完成)。
  • 一旦触发了整个异步管道,我们就等待它被处理并返回我们可以断言的结果列表。

由于Reactor具有更多开箱即用的组合运算符,因此可以简化此过程:

与未来代码等效的Reactor代码示例

Fluxids = ifhrIds();
Fluxcombinations =
		ids.flatMap(id -> {
			MononameTask = ifhrName(id);
			MonostatTask = ifhrStat(id);
			return nameTask.zipWith(statTask,
					(name, stat) -> "Name " + name + " has stats " + stat);
		});
Monoresult = combinations.collectList();
Listresults = result.block();
assertThat(results).containsExactly(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
  • 这一次,我们从异步提供的ids(a Flux)序列开始。
  • 对于序列中的每个元素,我们异步处理它(在body函数内部flatMap)两次。
  • 获取相关名称。
  • 获取相关统计信息。
  • 异步组合2个值。
  • 在将值List变为可用时将值聚合为a 。
  • 在生产中,我们将继续Flux通过进一步组合或订阅它来异步处理。最有可能的是,我们会回归result Mono。由于我们在测试中,我们阻塞,等待处理完成,然后直接返回聚合的值列表。
  • 断言结果。

Callback和Future的这些风险是相似的,并且是反应式编程与该Publisher-Subscriber对的关系。

从命令式到反应式编程

诸如Reactor之类的反应库旨在解决JVM上“经典”异步方法的这些缺点,同时还关注一些其他方面:

  • 可组合性和可读性
  • 数据作为一个用丰富的运算符词汇表操纵的流程
  • 在您订阅之前没有任何事情发生
  • 背压或消费者向生产者发出信号表明排放率过高的能力
  • 高级但高价值的抽象,与并发无关

可组合性和可读性

通过可组合性,我们指的是编排多个异步任务的能力,使用先前任务的结果将输入提供给后续任务或以fork-join方式执行多个任务,以及将异步任务重用为更高级别系统中的分立组件。

编排任务的能力与代码的可读性和可维护性紧密相关。随着异步过程层数量和复杂性的增加,能够编写和读取代码变得越来越困难。正如我们所看到的,回调模型很简单,但其主要缺点之一是,对于复杂的进程,您需要从回调执行回调,本身嵌套在另一个回调中,依此类推。那个混乱被称为Callback Hell。正如你可以猜到的(或者从经验中得知),这样的代码很难回归并推理。

Reactor提供了丰富的组合选项,其中代码反映了抽象过程的组织,并且所有内容通常都保持在同一级别(嵌套最小化)。

类比装配线工作流程

您可以将响应式应用程序处理的数据视为在装配线中移动。反应器既是传送带又是工作站。原材料从原料(原始Publisher)中倒出,最终成为成品,准备推送给消费者(或Subscriber)。

原材料可以经历各种转换和其他中间步骤,或者是将中间件聚集在一起的较大装配线的一部分。如果在某一点出现毛刺或堵塞(也许装箱产品需要不成比例的长时间),受影响的工作站可向上游发出信号以限制原材料的流动。

操作符(运算符)

在Reactor中,运算符是我们的汇编类比中的工作站。每个操作符都将行为添加到a Publisher并将上一步骤包装Publisher到新实例中。因此,整个链被链接,使得数据源自第一Publisher链并且向下移动链,由每个链转换。最终,Subscriber完成了整个过程。请记住,在Subscriber订阅a 之前没有任何事情发生Publisher,下面就会提到。

了解操作员创建新实例可以帮助您避免一个常见错误,该错误会导致您认为您的链中使用的操作员未被应用。看到这个项目的常见问题。
虽然Reactive Streams规范根本没有指定运算符,但Reactor等反应库的最佳附加值之一是它们提供的丰富的运算符。这些涉及很多方面,从简单的转换和过滤到复杂的编排和错误处理。

在你订阅之前什么都不会发生

在Reactor中,当您编写Publisher链时,默认情况下数据不会启动。相反,您可以创建异步过程的抽象描述(这可以帮助重用和组合)。

通过订阅行为,您将Publishera 绑定到a Subscriber,从而触发整个链中的数据流。这是通过上游传播的单个request 信号在内部实现的Subscriber,一直传回源 Publisher。

背压

上游传播信号也用于实现背压,我们在装配线中将其描述为当工作站比上游工作站处理速度慢时向线路发送的反馈信号。

Reactive Streams规范定义的真实机制非常接近于类比:订阅者可以在无限制模式下工作,让源以最快的速度推送所有数据,或者可以使用该request机制向源发送信号表明它已准备就绪处理最多的n元素。

中间操作员也可以在途中更改请求。想象一个buffer 运算符,它将元素分组为10个。如果订阅者请求1个缓冲区,则源可以生成10个元素。一些操作员还实施 预取策略,这避免了request(1)往返,并且如果在请求之前生成元素并不太昂贵,则是有益的。

这将推模型转换为推拉式混合动力,如果它们随时可用,下游可以从上游拉出n个元素。但是如果元素没有准备好,它们就会在生成时被上游推动。

热与冷

在反应库的Rx家族中,人们可以区分两大类反应序列:热和冷。这种区别主要与反应流如何对订阅的用户做出反应有关:

冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。

而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

以上就是Reactive反应式编程及使用介绍的详细内容,更多关于Reactive反应式编程使用的资料请关注我们其它相关文章!

(0)

相关推荐

  • 详解Java中的reactive stream协议

    背景 每个数据流都有一个生产者一个消费者.生产者负责产生数据,而消费者负责消费数据.如果是同步系统,生产一个消费一个没什么问题.但是如果在异步系统中,就会产生问题. 因为生产者无法感知消费者的状态,不知道消费者到底是繁忙状态还是空闲状态,是否有能力去消费更多的数据. 一般来说数据队列的长度都是有限的,即使没有做限制,但是系统的内存也是有限的.当太多的数据没有被消费的话,会导致内存溢出或者数据得不到即使处理的问题. 这时候就需要back-pressure了. 如果消息接收方消息处理不过来,则可以通

  • ReactiveCocoa代码实践之-UI组件的RAC信号操作

    相关阅读: ReactiveCocoa代码实践之-更多思考 ReactiveCocoa代码实践之-RAC网络请求重构这一节是自己对网络层的一些重构,本节是自己一些代码小实践做出的一些demo程序,基本涵盖大多数UI控件操作. 一.用UISlider实现调色板 假设我们现在做一个demo,上面有一个View用来展示颜色,下面有三个UISlider滑竿分别控制RGB的色值,随着不同滑竿的拖动上面view的颜色会随之改变. 可以先脑补一下不用RAC该怎么写. 如果使用RAC只需要将三个信号包装起来用适

  • Vue3中reactive函数toRef函数ref函数简介

    目录 reactive函数 用法: toRef函数(了解即可) 用法: ref函数 定义响应式数据 直接定义使用 获取dom 获取组件实例对象 reactive函数 reactive用于定义响应式数据(可以理解 成data的替代品) 用法: 导入 import {reactive} from 'vue' 使用: const state=reactive({ 参数名:参数值 }) 访问: state.参数名 访问: state.参数名 toRef函数(了解即可) toRef:将响应式数据中某个字段

  • 详解如何使用ReactiveObjC

    概述 RAC架构框架图 信号流程 基本使用 1.基本控件 UITextField //监听文本输入 [[_textField rac_textSignal] subscribeNext:^(NSString * _Nullable x) { NSLog(@"%@",x); }]; //可根据自己想要监听的事件选择 [[_textField rac_signalForControlEvents:UIControlEventEditingChanged] subscribeNext:^(_

  • Reactive反应式编程及使用介绍

    目录 前言 反应式编程简介 阻塞可能会浪费资源 使用异步来解决? 回调地狱的例子 与回调代码等效的Reactor代码示例 具有超时和回退的Reactor代码示例 CompletableFuture组合的例子 与未来代码等效的Reactor代码示例 从命令式到反应式编程 可组合性和可读性 类比装配线工作流程 操作符(运算符) 在你订阅之前什么都不会发生 背压 热与冷 前言 前一篇分析了Spring WebFlux的设计及实现原理后,反应式编程又来了,Spring WebFlux其底层还是基于Rea

  • Spring5新特性之Reactive响应式编程

    目录 1什么是响应式编程 2回顾Reactor 2.1什么是Reactor 2.2为什么是Reactor 2.3Reactor模式的经典实现—Netty 3Spring5中多Reactive的支持 3.1SpringWebflux 3.1.1依赖 3.1.2Controller代码 3.1.3测试 3.1.4SpringMVC和SpringWebFlux模式上的不同 3.2SpringDataReactiveRespositories 3.2.1依赖 3.2.2配置 3.3.3测试 4如何理解R

  • 一文带你搞懂Spring响应式编程

    目录 1. 前言 1.1 常用函数式编程 1.2 Stream操作 2. Java响应式编程 带有中间处理器的响应式流 3. Reactor 3.1 Flux & Mono 3.2 Flux Mono创建与使用 4. WebFlux Spring WebFlux示例 基于注解的WebFlux 基于函数式编程的WebFlux Flux与Mono的响应式编程延迟示例 总结 哈喽,大家好,我是指北君. 相信响应式编程经常会在各种地方被提到.本篇就为大家从函数式编程一直到Spring WeFlux做一次

  • Vue3中的ref和reactive响应式原理解析

    目录 1 ref 2 isref判断是不是一个ref对象 3 shallowref创建一个跟踪自身.value变化的 ref,但不会使其值也变成响应式的 4 triggerRef 5 customRef 6 reactive用来绑定复杂的数据类型 7 readonly 8 shallowReactive 9toRef 10toRefs 11toRaw Vue3系列4--ref和reactive响应式 本节主要介绍了响应式变量和对象,以及变量和对象在响应式和非响应式之间的转换. 1 ref 接受一

  • C#管道式编程的介绍与实现

    受 F# 中的管道运算符和 C# 中的 LINQ 语法,管道式编程为 C# 提供了更加灵活性的功能性编程.通过使用 扩展函数 可以将多个功能连接起来构建成一个管道. 前言 在 C# 编程中,管道式编程(Pipeline Style programming)其实存在已久,最明显的就是我们经常使用的 LINQ.在进入 DotNetCore 世界后, 这种编程方式就更加明显,比如各种中间件的使用.通过使用这种编程方式,大大提高了代码的可维护性,优化了的业务的组合方式. 管道式编程具有如下优点: 创建一

  • java9新特性Reactive Stream响应式编程 API

    目录 一.Java9ReactiveStreamAPI 二.Java响应式编程四大接口 2.1.SubscriberInterface(订阅者订阅接口) 2.2.SubscriptionInterface(订阅令牌接口) 2.3.PublisherInterface(发布者接口) 2.4.ProcessorInterface(处理器接口) 二.实战案例 实现订阅者SubscriberInterface SubmissionPublisher消息发布者 我计划在后续的一段时间内,写一系列关于jav

  • java9新特性Reactive Stream响应式编程 API

    目录 一.Java9ReactiveStreamAPI 二.Java响应式编程四大接口 2.1.SubscriberInterface(订阅者订阅接口) 2.2.SubscriptionInterface(订阅令牌接口) 2.3.PublisherInterface(发布者接口) 2.4.ProcessorInterface(处理器接口) 二.实战案例 实现订阅者SubscriberInterface SubmissionPublisher消息发布者 我计划在后续的一段时间内,写一系列关于jav

  • 关于springboot响应式编程整合webFlux的问题

    在servlet3.0标准之前,是每一个请求对应一个线程.如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况,因为一旦要调用第三方接口,就有可能出现这样的操作了.早期的处理方式只能是手工控制线程. 在servlet3.0标准之后,为了解决此类问题,所以提供了异步响应的支持.在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他用户使用,这样的操作机制将极大的提升程序的并发性能.

  • springboot3+r2dbc响应式编程实践

    目录 r2dbc 工程依赖 配置文件 配置类 bean DAO controller Spring boot3已经M1了,最近群佬们也开始蠢蠢欲动的开始整活Reactive+Spring Boot3,跟着大家的步伐,我也来整一篇工程入门,我们将用java17+Spring Boot3+r2dbc+Reactive栈来讲述,欢迎大家来讨论.(关于响应式,请大家异步到之前的文章里,有详细介绍.) r2dbc Reactor还有基于其之上的Spring WebFlux框架.包括vert.x,rxjav

  • 剖析Spring WebFlux反应式编程设计及工作原理

    目录 前言 接口抽象 WebServer ReactiveWebServerFactory HttpHandler 启动流程分析 ReactiveWebServerApplicationContext 前言 Spring 5发布有两年了,随Spring 5一起发布了一个和Spring WebMvc同级的Spring WebFlux.这是一个支持反应式编程模型的新框架体系.反应式模型区别于传统的MVC最大的不同是异步的.事件驱动的.非阻塞的,这使得应用程序的并发性能会大大提高,单位时间能够处理更多

随机推荐