详解Java中的reactive stream协议

背景

每个数据流都有一个生产者一个消费者。生产者负责产生数据,而消费者负责消费数据。如果是同步系统,生产一个消费一个没什么问题。但是如果在异步系统中,就会产生问题。

因为生产者无法感知消费者的状态,不知道消费者到底是繁忙状态还是空闲状态,是否有能力去消费更多的数据。

一般来说数据队列的长度都是有限的,即使没有做限制,但是系统的内存也是有限的。当太多的数据没有被消费的话,会导致内存溢出或者数据得不到即使处理的问题。

这时候就需要back-pressure了。

如果消息接收方消息处理不过来,则可以通知消息发送方,告知其正在承受压力,需要降低负载。back-pressure是一种消息反馈机制,从而使系统得以优雅地响应负载, 而不是在负载下崩溃。

而reactive stream的目的就是用来管理异步服务的流数据交换,并能够让接收方自主决定接受数据的频率。back-pressure就是reactive stream中不可或缺的一部分。

什么是reactive stream

上面我们讲到了reactive stream的作用,大家应该对reactive stream有了一个基本的了解。这里我们再给reactive stream做一个定义:

reactive stream就是一个异步stream处理的标准,它的特点就是非阻塞的back pressure。

reactive stream只是一个标准,它定义了实现非阻塞的back pressure的最小区间的接口,方法和协议。

所以reactive stream其实有很多种实现的,不仅仅是java可以使用reactive stream,其他的编程语言也可以。

reactive stream只是定义了最基本的功能,各大实现在实现了基本功能的同时可以自由扩展。

目前reactive stream最新的java版本是1.0.3,是在2019年8月23发布的。它包含了java API,协议定义文件,测试工具集合和具体的实现例子。

深入了解java版本的reactive stream

在介绍java版本的reactive stream之前,我们先回顾一下reactive stream需要做哪些事情:

1.能够处理无效数量的消息

2.消息处理是有顺序的

3.可以异步的在组件之间传递消息

4.一定是非阻塞和backpressure的

为了实现这4个功能,reactive stream定义了4个接口,Publisher,Subscriber,Subscription,Processor。这四个接口实际上是一个观察者模式的实现。接下来我们详细来分析一下各个接口的作用和约定。

Publisher

先看下Publisher的定义:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher就是用来生成消息的。它定义了一个subscribe方法,传入一个Subscriber。这个方法用来将Publisher和Subscriber进行连接。

一个Publisher可以连接多个Subscriber。

每次调用subscribe建立连接,都会创建一个新的Subscription,Subscription和subscriber是一一对应的。

一个Subscriber只能够subscribe一次Publisher。

如果subscribe失败或者被拒绝,则会出发Subscriber.onError(Throwable)方法。

Subscriber

先看下Subscriber的定义:

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscriber就是消息的接收者。

在Publisher和Subscriber建立连接的时候会触发onSubscribe(Subscription s)方法。

当调用Subscription.request(long)方法时,onNext(T t)会被触发,根据request请求参数的大小,onNext会被触发一次或者多次。

在发生异常或者结束时会触发onError(Throwable t)或者onComplete()方法。

Subscription

先看下Subscription的定义:

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Subscription代表着一对一的Subscriber和Publisher之间的Subscribe关系。

request(long n)意思是向publisher请求多少个events,这会触发Subscriber.onNext方法。

cancel()则是请求Publisher停止发送信息,并清除资源。

Processor

先看下Processor的定义:

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Processor即是Subscriber又是Publisher,它代表着一种处理状态。

JDK中reactive stream的实现

在JDK中java.util.concurrent.Flow就是reactive stream语义的一种实现。

Flow从JDK9就开始有了。我们看下它的结构:

从上图我们可以看到在JDK中Flow是一个final class,而Subscriber,Publisher,Subscription,Processor都是它的内部类。

总结

reactive stream的出现有效的解决了异步系统中的背压问题。只不过reactive stream只是一个接口标准或者说是一种协议,具体的实现还需要自己去实现。

以上就是详解Java中的reactive stream协议的详细内容,更多关于Java中的reactive stream协议的资料请关注我们其它相关文章!

(0)

相关推荐

  • 利用JavaFX工具构建Reactive系统

    JavaFX 是Java中用来构建图形应用程序的新的标准库, 但许多程序员仍然坚持在使用Swing甚至AWT.关于如何利用JavaFX工具集中的新的超棒特性来构建响应式的快速应用程序,这里有一些建议! 1. 属性值 如果你对JavaFX组件做过完整的了解,移动遇到过属性(Property)这个东西.FX库中几乎每个值都可以被观察,分区(divider)的宽度,图片的尺寸,文本标识(label)中的文字,一个列表中的子项以及复选框(checkbox)的状态.属性分成另类:可写属性和可读属性.可写值

  • java8新特性之stream流中reduce()求和知识总结

    1.stream().reduce()单字段求和 (1)普通数字求和 public static void test2(){ List<Integer> list= Arrays.asList(new Integer[]{1,2,3,4,5,6,7,8,9}); Integer sum=list.stream().reduce((x,y)->x+y).get(); System.out.println(sum); } 2.BigDecimal求和 public static void m

  • Java8新特性之Stream API详解

    一.前言 StreamAPI在Java8版本中使用,关注的是对数据的筛选.查找.存储等 它可以做的事情有:过滤.排序.映射.归约 二.使用流程 Stream实例化中间操作(过滤.排序.映射.规约)终止操作(匹配查找.归约.收集) 三.案例演示 public class EmployeeData { public static List<Employee> getEmployees(){ List<Employee> list = new ArrayList<>(); l

  • java8 stream多字段排序的实现

    很多情况下sql不好解决的多表查询,临时表分组,排序,尽量用java8新特性stream进行处理 使用java8新特性,下面先来点基础的 List<类> list; 代表某集合 //返回 对象集合以类属性一升序排序 list.stream().sorted(Comparator.comparing(类::属性一)); //返回 对象集合以类属性一降序排序 注意两种写法 list.stream().sorted(Comparator.comparing(类::属性一).reversed());/

  • Java基础之FileInputStream和FileOutputStream流详解

    一.前言 FileInputStream 用于读取本地文件中的字节数据,继承InputStream类 FileOutputStream 将字节数据写到文件,继承OutputStream类 二.创建流对象 FileInputStream fis= new FileInputStream("绝对路径"); FileOutputStream fos= new FileOutputStream("绝对路径"); 三.FileInputStream常用方法 1.构造函数,打开

  • java8 stream的多字段排序实现(踩坑)

    关于java8 的stream排序用法这里不做多说,这里介绍下曾经在多字段排序时遇到过的一个坑. 需求:需要根据id去分组,然后取出每组中行号最大的一个对象值. 想到可以利用stream的多字段排序,先按id去排,再看行号去排,demo代码如下: class Tt{ private int id; private int line; public Tt(int id, int line) { this.id = id; this.line = line; } public int getId()

  • Java GZIPOutputStream流压缩文件的操作

    我就废话不多说了,大家还是直接看代码吧~ 不多说,直接上代码 public static void main(String[] args) throws Exception{ //压缩文件 File src = new File("e:/xx/aa.txt"); File zipFile = new File("e:/xx/a.zip"); FileOutputStream fos = new FileOutputStream(zipFile); ZipOutput

  • Java进阶核心之InputStream流深入讲解

    Java核心包 java.io包介绍 IO: Input / Ouput 即输入输出 输出流:程序(内存) ->外界设备 输入流:外界设备->程序(内存) 处理理数据类型分类 字符流:处理字符相关,如处理文本数据(如txt文件), Reader/Writer 字节流: 处理字节相关,如声音或者图片等二进制,InputStream/OutputStream 两者区别: 字节流以字节(8bit)为单位,字符流以字符为单位,根据码表映射字符,一次可能读多个字节 字节流可以处理几乎所有文件,字符流只能

  • Java8 实现stream将对象集合list中抽取属性集合转化为map或list

    首先新建一个实体类Person @Data public class Person { /** 编码 */ private String code; /** 名字 */ private String name; public Person(String code, String name) { this.code = code; this.name = name; } } 实例化三个对象放入list集合中 public static void main(String[] args) { Pers

  • 详解Java中的reactive stream协议

    背景 每个数据流都有一个生产者一个消费者.生产者负责产生数据,而消费者负责消费数据.如果是同步系统,生产一个消费一个没什么问题.但是如果在异步系统中,就会产生问题. 因为生产者无法感知消费者的状态,不知道消费者到底是繁忙状态还是空闲状态,是否有能力去消费更多的数据. 一般来说数据队列的长度都是有限的,即使没有做限制,但是系统的内存也是有限的.当太多的数据没有被消费的话,会导致内存溢出或者数据得不到即使处理的问题. 这时候就需要back-pressure了. 如果消息接收方消息处理不过来,则可以通

  • 一文详解Java中Stream流的使用

    目录 简介 操作1:创建流 操作2:中间操作 筛选(过滤).去重 映射 排序 消费 操作3:终止操作 匹配.最值.个数 收集 规约 简介 说明 本文用实例介绍stream的使用. JDK8新增了Stream(流操作) 处理集合的数据,可执行查找.过滤和映射数据等操作. 使用Stream API 对集合数据进行操作,就类似于使用 SQL 执行的数据库查询.可以使用 Stream API 来并行执行操作. 简而言之,Stream API 提供了一种高效且易于使用的处理数据的方式. 特点 不是数据结构

  • 一文详解Java中的Stream的汇总和分组操作

    目录 前言 一.查找流中的最大值和最小值 二.汇总 三.连接字符串 四.分组 1.分组 2.多级分组 3.按子组数据进行划分 后记 前言 在前面的文章中其实大家也已经看到我使用过collect(Collectors.toList()) 将数据最后汇总成一个 List 集合. 但其实还可以转换成Integer.Map.Set 集合等. 一.查找流中的最大值和最小值 static List<Student> students = new ArrayList<>(); ​ static

  • 详解Java 中的 AutoCloseable 接口

    一.前言 最近用到了 JDK 7 中的新特性 try-with-resources 语法,感觉到代码相对简洁了很多,于是花了点时间详细学习了下,下面分享给大家我的学习成果. 二.简单了解并使用 try-with-resources语法比较容易使用,一般随便搜索看下示例代码就能用起来了.JDK 对这个语法的支持是为了更好的管理资源,准确说是资源的释放. 当一个资源类实现了该接口close方法,在使用try-with-resources语法创建的资源抛出异常后,JVM会自动调用close 方法进行资

  • 详解Java 中 RMI 的使用

    RMI 介绍 RMI (Remote Method Invocation) 模型是一种分布式对象应用,使用 RMI 技术可以使一个 JVM 中的对象,调用另一个 JVM 中的对象方法并获取调用结果.这里的另一个 JVM 可以在同一台计算机也可以是远程计算机.因此,RMI 意味着需要一个 Server 端和一个 Client 端. Server 端通常会创建一个对象,并使之可以被远程访问. 这个对象被称为远程对象.Server 端需要注册这个对象可以被 Client 远程访问. Client 端调

  • 详解JAVA中的OPTIONAL

    一.概述 本质上,这是一个包含有可选值的包装类,这意味着 Optional 类既可以含有对象也可以为空. Optional 是 Java 实现函数式编程的强劲一步,并且帮助在范式中实现.但是 Optional 的意义显然不止于此. 我们从一个简单的用例开始.在 Java 8 之前,任何访问对象方法或属性的调用都可能导致NullPointerException: String isocode = user.getAddress().getCountry().getIsocode().toUpper

  • 详解Java中Collector接口的组成

    一.Collector常常出现的地方 java8引入了stream,Collector是与stream一起出现的,配合stream使用的好帮手,如果用过stream,我们应该都有写过这样的代码 例子1: lists.stream()....collect(Collectors.toList()); 例子2: lists.stream().collect(groupingBy(String::length)); 这两个例子中,toList()和groupingBy()返回的都是一个Collecto

  • 详解Java中List的正确的删除方法

    目录 简介 实例 正确方法 法1:for的下标倒序遍历 法2: list.stream().filter().collect() 法3: iterator迭代器 错误方法 法1:for(xxx : yyy)遍历 法2:for的下标正序遍历 原因分析 简介 本文介绍Java的List的正确的删除方法. 实例 需求:有如下初始数据,将list中的所有数据为"b"的元素删除掉.即:填充removeB()方法 package com.example.a; import java.util.Ar

  • 详解Java中的OkHttp JSONP爬虫

    目录 什么是JSOUP 什么是OkHttp 爬虫需要掌握的技术 需要的依赖 JSON入门Demo JSOUP常用方法 使用JSOUP 方式连接 User-Agent(随机) 后台爬虫的三大问题 selenium+phantomjs(维护中…内容重新整理) 什么是JSOUP JSOUP 是一款Java 的HTML解析器,可直接解析某个URL地址.HTML文本内容.它提供了一套非常省力的API,可通过DOM,CSS以及类似于jQuery的操作方法来取出和操作数据. 官网 jsoup实现了WHATWG

  • 详解Java中的字节码增强技术

    目录 1.字节码增强技术 2.常见技术 3.ASM 3.1 测试 Main 3.2 测试 CustomerClassVisitor 3.3 测试 Test 1.字节码增强技术 字节码增强技术就是一类对现有字节码进行修改或者动态生成全新字节码文件的技术. 参考地址 2.常见技术 技术分类 类型 静态增强 AspectJ 动态增强 ASM.Javassist.Cglib.Java Proxy 3.ASM <dependency> <groupId>org.ow2.asm</gro

随机推荐