如何使用Reactor完成类似Flink的操作

一、背景

Flink在处理流式任务的时候有很大的优势,其中windows等操作符可以很方便的完成聚合任务,但是Flink是一套独立的服务,业务流程中如果想使用需要将数据发到kafka,用Flink处理完再发到kafka,然后再做业务处理,流程很繁琐。

比如在业务代码中想要实现类似Flink的window按时间批量聚合功能,如果纯手动写代码比较繁琐,使用Flink又太重,这种场景下使用响应式编程RxJava、Reactor等的window、buffer操作符可以很方便的实现。

响应式编程框架也早已有了背压以及丰富的操作符支持,能不能用响应式编程框架处理类似Flink的操作呢,答案是肯定的。

本文使用Reactor来实现Flink的window功能来举例,其他操作符理论上相同。文中涉及的代码:github

二、实现过程

Flink对流式处理做的很好的封装,使用Flink的时候几乎不用关心线程池、积压、数据丢失等问题,但是使用Reactor实现类似的功能就必须对Reactor运行原理比较了解,并且经过不同场景下测试,否则很容易出问题。

下面列举出实现过程中的核心点:

1、创建Flux和发送数据分离

入门Reactor的时候给的示例都是创建Flux的时候同时就把数据赋值了,比如:Flux.just、Flux.range等,从3.4.0版本后先创建Flux,再发送数据可使用Sinks完成。有两个比较容易混淆的方法:

  • Sinks.many().multicast() 如果没有订阅者,那么接收的消息直接丢弃
  • Sinks.many().unicast() 如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅
  • Sinks.many().replay() 不管有多少订阅者,都保存所有消息

在此示例场景中,选择的是Sinks.many().unicast()

官方文档:https://projectreactor.io/docs/core/release/reference/#processors

2、背压支持

上面方法的对象背压策略支持两种:BackpressureBuffer、BackpressureError,在此场景肯定是选择BackpressureBuffer,需要指定缓存队列,初始化方法如下:Queues.get(queueSize).get()

数据提交有两个方法:

  • emitNext 指定提交失败策略同步提交
  • tryEmitNext 异步提交,返回提交成功、失败状态

在此场景我们不希望丢数据,可自定义失败策略,提交失败无限重试,当然也可以调用异步方法自己重试。

 Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

在此之后就就可以调用Sinks.asFlux开心的使用各种操作符了。

在此之后就就可以调用Sinks.asFlux开心的使用各种操作符了。

3、窗口函数

Reactor支持两类窗口聚合函数:

  • window类:返回Mono(Flux)
  • buffer类:返回List

在此场景中,使用buffer即可满足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大个数,最大等待时间操作,Flink中的keys操作可以用groupBy、collectMap来实现。

4、消费者处理

Reactor经过buffer后是一个一个的发送数据,如果使用publishOn或subscribeOn处理的话,只等待下游的subscribe处理完成才会重新request新的数据,buffer操作符才会重新发送数据。如果此时subscribe消费者耗时较长,数据流会在buffer流程阻塞,显然并不是我们想要的。

理想的操作是消费者在一个线程池里操作,可多线程并行处理,如果线程池满,再阻塞buffer操作符。解决方案是自定义一个线程池,并且当然线程池如果任务满submit支持阻塞,可以用自定义RejectedExecutionHandler来实现:

 RejectedExecutionHandler executionHandler = (r, executor) -> {
   try {
     executor.getQueue().put(r);
   } catch (InterruptedException e) {
     Thread.currentThread().interrupt();
     throw new RejectedExecutionException("Producer thread interrupted", e);
   }
 };

 new ThreadPoolExecutor(poolSize, poolSize,
     0L, TimeUnit.MILLISECONDS,
     new SynchronousQueue<>(),
     executionHandler);

三、总结

1、总结一下整体的执行流程

提交任务:提交数据支持同步异步两种方式,支持多线程提交,正常情况下响应很快,同步的方法如果队列满则阻塞。
丰富的操作符处理流式数据。
buffer操作符产生的数据多线程处理:同步提交到单独的消费者线程池,线程池任务满则阻塞。
消费者线程池:支持阻塞提交,保证不丢消息,同时队列长度设置成0,因为前面已经有队列了。
背压:消费者线程池阻塞后,会背压到buffer操作符,并背压到缓冲队列,缓存队列满背压到数据提交者。

2、和Flink的对比

实现的Flink的功能:

  • 不输Flink的丰富操作符
  • 支持背压,不丢数据

优势:

  • 轻量级,可直接在业务代码中使用

劣势:

  • 内部执行流程复杂,容易踩坑,不如Flink傻瓜化
  • 没有watermark功能,也就意味着只支持无序数据处理
  • 没有savepoint功能,虽然我们用背压解决了部分问题,但是宕机后开始会丢失缓存队列和消费者线程池里的数据,补救措施是添加Java Hook功能
  • 只支持单机,意味着你的缓存队列不能设置无限大,要考虑线程池的大小,且没有flink globalWindow等功能
  • 需考虑对上游数据源的影响,Flink的上游一般是mq,数据量大时可自动堆积,如果本文的方案上游是http、rpc调用,产生的阻塞影响就不能忽略。补偿方案是每次提交数据都使用异步方法,如果失败则提交到mq中缓冲并消费该mq无限重试。

四、附录

本文源码地址:https://github.com/sofn/reactor-window-like-flink

Reactor官方文档:https://projectreactor.io/docs/core/release/reference/

Flink文档:https://ci.apache.org/projects/flink/flink-docs-stable/

Reactive操作符:http://reactivex.io/documentation/operators.html

以上就是如何使用Reactor完成类似Flink的操作的详细内容,更多关于使用Reactor完成类似Flink的操作的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java lambda表达式实现Flink WordCount过程解析

    这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 本篇我们将使用Java语言来实现Flink的单词统计. 代码开发 环境准备 导入Flink 1.9 pom依赖 <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>

  • 大数据HelloWorld-Flink实现WordCount

    所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word.那就是Word Count.MR,Spark,Flink以来开篇第一个程序都是Word Count.那么今天Flink开始目标就是在本地调试出Word Count. 单机安装Flink 开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式.作者比较穷,机器配置太低开不了几个虚拟机.所以只能先演示个单机的安装. Apache Flink需要在Java1.8+以上的环境中运行 . 所以

  • Java Reactor反应器模式使用方法详解

    Reactor反应器模式 到目前为止,高性能网络编程都绕不开反应器模式.很多著名的服务器软件或者中间件都是基于反应器模式实现的,如Nginx.Redis.Netty. 反应器模式是高性能网络编程的必知.必会的模式. Reactor简介 反应器模式由Reactor反应器线程.Handlers处理器两大角色组成: (1)Reactor反应器线程的职责:负责响应IO事件,并且分发到Handlers处理器. (2)Handlers处理器的职责:非阻塞的执行业务处理逻辑. 从上面的反应器模式定义,看不出这

  • IDEA上运行Flink任务的实战教程

    欢迎访问我的GitHub https://github.com/zq2599/blog_demos 内容:所有原创文章分类汇总及配套源码,涉及Java.Docker.Kubernetes.DevOPS等: IDEA是常用的IDE,我们编写的flink任务代码如果能直接在IDEA运行,会给学习和开发带来很大便利,例如改完代码立即运行不用部署.断点.单步调试等: 环境信息 电脑:2019版13寸MacBook Pro,2.3 GHz 四核Intel Core i5,8 GB 2133 MHz LPD

  • Flink开发IDEA环境搭建与测试的方法

    一.IDEA开发环境 1.pom文件设置 <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version>

  • ACE反应器(Reactor)模式的深入分析

    反应器(Reactor):用于事件多路分离和分派的体系结构模式通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞与非阻塞.所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待状态, 直到有东西可读或者可写为止.而对于非阻塞状态, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待. 在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞

  • Reactor反应器的实现方法详解

    大多数应用都会使用ACE_Reactor::instance()提供的默认反应器实例.但是你也可以选择自己的反应器,这是因为ACE使用了Bridge模式(使用两个不同的类:一个是编程接口,另一个是实现,第一个类会把各个操作传给第二个类).例如使用线程池反应器实现:ACE_TP_Reactor* tp_reactor = new ACE_TP_Reactor;ACE_Reactor* my_reactor = new ACE_Reactor(tp_reactor, 1);//1表示my_react

  • Apache Flink 任意 Jar 包上传导致远程代码执行漏洞复现问题(漏洞预警)

    漏洞描述 Apache Flink是一个用于分布式流和批处理数据的开放源码平台.Flink的核心是一个流数据流引擎,它为数据流上的分布式计算提供数据分发.通信和容错功能.Flink在流引擎之上构建批处理,覆盖本地迭代支持.托管内存和程序优化.近日有安全研究人员发现apache flink允许上传任意的jar包从而导致远程代码执行. 漏洞级别 高危 影响范围 Apache Flink <=1.9.1 漏洞复现 首先下载Apache Flink 1.9.1安装包并进行解压,之后进入bin文件夹内运行

  • 详解Python的Twisted框架中reactor事件管理器的用法

    铺垫 在大量的实践中,似乎我们总是通过类似的方式来使用异步编程: 监听事件 事件发生执行对应的回调函数 回调完成(可能产生新的事件添加进监听队列) 回到1,监听事件 因此我们将这样的异步模式称为Reactor模式,例如在iOS开发中的Run Loop概念,实际上非常类似于Reactor loop,主线程的Run Loop监听屏幕UI事件,一旦发生UI事件则执行对应的事件处理代码,还可以通过GCD等方式产生事件至主线程执行. 上图是boost对Reactor模式的描绘,Twisted的设计就是基于

  • 简单了解Java Netty Reactor三种线程模型

    1. Reactor三种线程模型 1.1. 单线程模型 Reactor单线程模型,指的是所有的IO操作都在同一个NIO线程上面完成,NIO线程的职责如下: 1)作为NIO服务端,接收客户端的TCP连接: 2)作为NIO客户端,向服务端发起TCP连接: 3)读取通信对端的请求或者应答消息: 4)向通信对端发送消息请求或者应答消息. Reactor单线程模型示意图如下所示: Reactor单线程模型 由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会导致阻塞,理论上一个线程可以独立处

  • Apache FlinkCEP 实现超时状态监控的步骤详解

    CEP - Complex Event Processing复杂事件处理. 订单下单后超过一定时间还未进行支付确认. 打车订单生成后超过一定时间没有确认上车. 外卖超过预定送达时间一定时限还没有确认送达. Apache FlinkCEP API CEPTimeoutEventJob FlinkCEP源码简析 DataStream和PatternStream DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter.Map等转换为另一个Da

随机推荐