

使用Reactor完成类似Flink的操作
source link: https://lesofn.com/archives/shi-yong-reactor-wan-cheng-lei-shi-de-flink-de-cao-zuo
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

使用Reactor完成类似Flink的操作
木小丰 2021年02月26日 1,255次浏览Flink在处理流式任务的时候有很大的优势,其中windows等操作符可以很方便的完成聚合任务,但是Flink是一套独立的服务,业务流程中如果想使用需要将数据发到kafka,用Flink处理完再发到kafka,然后再做业务处理,流程很繁琐。
比如在业务代码中想要实现类似Flink的window按时间批量聚合功能,如果纯手动写代码比较繁琐,使用Flink又太重,这种场景下使用响应式编程RxJava、Reactor等的window、buffer操作符可以很方便的实现。
响应式编程框架也早已有了背压以及丰富的操作符支持,能不能用响应式编程框架处理类似Flink的操作呢,答案是肯定的。
本文使用Reactor来实现Flink的window功能来举例,其他操作符理论上相同。文中涉及的代码:github
二、实现过程
Flink对流式处理做的很好的封装,使用Flink的时候几乎不用关心线程池、积压、数据丢失等问题,但是使用Reactor实现类似的功能就必须对Reactor运行原理比较了解,并且经过不同场景下测试,否则很容易出问题。
下面列举出实现过程中的核心点:
1、创建Flux和发送数据分离
入门Reactor的时候给的示例都是创建Flux的时候同时就把数据赋值了,比如:Flux.just、Flux.range等,从3.4.0版本后先创建Flux,再发送数据可使用Sinks完成。有两个比较容易混淆的方法:
- Sinks.many().multicast() 支持多订阅者,如果没有订阅者,那么接收的消息直接丢弃
- Sinks.many().unicast() 只支持一个订阅者,如果没有订阅者,那么保存接收的消息直到第一个订阅者订阅
- Sinks.many().replay() 不管有多少订阅者,都保存所有消息
在此示例场景中,选择的是Sinks.many().unicast()
官方文档:https://projectreactor.io/docs/core/release/reference/#processors
2、背压支持
上面方法的对象背压策略支持两种:BackpressureBuffer、BackpressureError,在此场景肯定是选择BackpressureBuffer,需要指定缓存队列,初始化方法如下:Queues.get(queueSize).get()
数据提交有两个方法:
- emitNext 指定提交失败策略同步提交
- tryEmitNext 异步提交,返回提交成功、失败状态
在此场景我们不希望丢数据,可自定义失败策略,提交失败无限重试,当然也可以调用异步方法自己重试。
Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();
在此之后就就可以调用Sinks.asFlux开心的使用各种操作符了。
3、窗口函数
Reactor支持两类窗口聚合函数:
- window类:返回Mono(Flux)
- buffer类:返回List
在此场景中,使用buffer即可满足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大个数,最大等待时间操作,Flink中的keys操作可以用groupBy、collectMap来实现。
4、消费者处理
Reactor经过buffer后是一个一个的发送数据,如果使用publishOn或subscribeOn处理的话,只等待下游的subscribe处理完成才会重新request新的数据,buffer操作符才会重新发送数据。如果此时subscribe消费者耗时较长,数据流会在buffer流程阻塞,显然并不是我们想要的。
理想的操作是消费者在一个线程池里操作,可多线程并行处理,如果线程池满,再阻塞buffer操作符。解决方案是自定义一个线程池,并且当然线程池如果任务满submit支持阻塞,可以用自定义RejectedExecutionHandler来实现:
RejectedExecutionHandler executionHandler = (r, executor) -> { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Producer thread interrupted", e); new ThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), executionHandler);
1、总结一下整体的执行流程
- 提交任务:提交数据支持同步异步两种方式,支持多线程提交,正常情况下响应很快,同步的方法如果队列满则阻塞。
- 丰富的操作符处理流式数据。
- buffer操作符产生的数据多线程处理:同步提交到单独的消费者线程池,线程池任务满则阻塞。
- 消费者线程池:支持阻塞提交,保证不丢消息,同时队列长度设置成0,因为前面已经有队列了。
- 背压:消费者线程池阻塞后,会背压到buffer操作符,并背压到缓冲队列,缓存队列满背压到数据提交者。
2、和Flink的对比
实现的Flink的功能:
- 不输Flink的丰富操作符
- 支持背压,不丢数据
优势:轻量级,可直接在业务代码中使用
- 内部执行流程复杂,容易踩坑,不如Flink傻瓜化
- 没有watermark功能,也就意味着只支持无序数据处理
- 没有savepoint功能,虽然我们用背压解决了部分问题,但是宕机后开始会丢失缓存队列和消费者线程池里的数据,补救措施是添加Java Hook功能
- 只支持单机,意味着你的缓存队列不能设置无限大,要考虑线程池的大小,且没有flink globalWindow等功能
- 需考虑对上游数据源的影响,Flink的上游一般是mq,数据量大时可自动堆积,如果本文的方案上游是http、rpc调用,产生的阻塞影响就不能忽略。补偿方案是每次提交数据都使用异步方法,如果失败则提交到mq中缓冲并消费该mq无限重试。
本文源码地址:https://github.com/sofn/reactor-window-like-flink
Reactor官方文档:https://projectreactor.io/docs/core/release/reference/
Flink文档:https://ci.apache.org/projects/flink/flink-docs-stable/
Reactive操作符:http://reactivex.io/documentation/operators.html
本文作者:木小丰,美团Java高级工程师,关注架构、软件工程、全栈等,不定期分享软件研发过程中的实践、思考。
作者博客:https://lesofn.com
公共号:Java研发
Recommend
-
31
介绍 响应式编程 响应式编程不同于我们熟悉的命令式编程,我们熟悉的命令式编程即 代码就是一行接一行的指令,按照它们的顺序一次一条地出现。一个任务被执行,程序就需要等到它执行完了,才能执行下一个任...
-
36
[译]2020年Spring状态报告 近日VMware发布了2020年Spring状态报告,该报告调查了1000多位不同行业的springboot开发者、架构师、技术经理等角色,以了...
-
6
0x00简介 Apache Flink是近几年大火的数据处理引擎。受到各大厂商的推崇并且已经应用与实际的业务场景中。很多公司在进行选型的时候都会选择Apache Flink作为选型的对象。Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式...
-
7
flink 在算子操作中请求web接口获取数据? 开源问答 ...
-
7
在 Reactor 应用程序中使用 R2DBC由于Reactor已经接管了 Java 世界,因此不可避免地会出现一个反应式 sql 库。在这篇博客中,我们将使用 r2dbc 和 h2 和 reactor。可以在
-
7
V2EX › Go 编程语言 go 怎么实现 方法前置操作 类似 PHP 的__call dzdh · 1 天前 · 1285 次点击
-
8
Flink SQL 知其所以然:Over 聚合操作-51CTO.COM
-
9
Flink Joins大家好,我是老羊,今天我们来学习 Flink SQL 中的· Join 操作...
-
8
Flink与Iceberg整合DataStream API操作目前Flink支持使用DataStream API 和SQL API 方式实时读取和写入Iceberg表,建议大家使用SQL API 方式实时读取和写入Iceberg表。Iceberg 支持的Flink版本为1.11.x版本以上,目前经过测试Iceberg版本与F...
-
6
Flink与Iceberg整合SQL API操作Flink SQL 在操作Iceberg时,对应的版本为Flink 1.11.x 与Iceberg0.11.1版本,目前,Flink1.14.2版本与Iceberg0.12.1版本对于SQL API 来说兼容有问题,所以这里使用Flink1.11.6版本与Iceberg0.11.1版本来演示Flink SQ...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK