13

居然能通过大保健来理解Flink中的时间语义

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzI0NTIxNzE1Ng%3D%3D&%3Bmid=2651220884&%3Bidx=1&%3Bsn=ddbd555024c57c1fdfc4b3e84501d5bf
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

bmaINb7.jpg!mobile

#  本篇要解决的问题

Processing Time、Event time、Ingestion Time的不同点?

请解释下Watermark。

如何解决延迟数据问题?

在Flink中如果Source并行度大于1,Watermark是一致的吗?

如果一个Task会从多个输入消费数据,因为涉及到多个数据流入,如何确定它的时间呢?

如果某个Source的Task,一直闲置,而另外一个Task是运行状态,会出现什么样的问题?如何解决?

近两年,Flink状态流处理变得越来越流行起来。其实最开始的时候,流处理都是无状态的。例如:Storm。

EJfeii.png!mobile

流处理在当时的定义就是更快地批处理。所以当时流处理基本是有很多是作为ETL的管道。而随着流处理变得越来越重要,流处理开始处理一些复杂业务,必须要进行有状态地处理。现在的框架也都开始支持有状态的流处理。例如:Flink、Structured Streaming等。

在我们学习更多关于状态的知识之前,我们要先来理解Flink中的时间语义。这在Flink中可以说非常核心的内容。

1

Flink中的时间语义

在流处理应用中,时间是很关键的。我们可以利用时间来对流中的事件分组、关联。而且,Flink中的window也与时间关联密切。因为,流处理应用是一个一直不停运行的应用程序。我们很多时候,会每隔一段时间了解下数据的情况,所以,需要对流进行定期地快照,也就是按照时间来进行快照。例如:当电商中发布一个秒杀活动时,想要每隔10分钟了解流量数据,这就需要采集10分钟内到达流处理应用中的所有事件,并进行各种计算。

而这个10分钟,可以有不同的解读方式。就像在按摩店上钟的时候,我们总是觉得按摩店的钟走得就是快,一会90分钟就到了。

1

Processing Time

Processing Time也就是处理时间,它是最容易理解的。例如:Flink引擎运行在的Linux系统上,这个系统上有一个时钟。我们可以基于这个始终来记录时间。我们根据该系统时间为准,来确定时间范围。例如:当前系统时间是12:00,那么12:00 - 12:10之间到达的数据认为10分钟内有效的数据。

jQjQv2b.png!mobile

听起来感觉挺有道理的,没啥问题啊。但大家考虑以下场景,假如说,某个传感器采集到数据之后,通过网络的方式发送数据。但偏偏网络抖动,导致要发送过来的数据产生了延迟,如果严格按照系统时间来计算,延迟的数据就会导致数据计算问题。而系统时间根本不关心,只需要看它自己的时钟,到点就开始计算。

这种体验是不好的,大家想想,如果你去按摩,预约是凌晨2点,但是因为在公司加班,晚了一会。但技师正常从2点开始上钟,管你来了没来,反正到底了下钟走人。这也太不靠谱了,哪里受得了这种!

B7v6BrU.png!mobile

2

Event Time

上面我说了说Processing Time的糟糕体验,只要稍有延迟,就可能会导致数据无法正确计算。而事件时间为围绕事件来的,就是根据数据实际发生的时间作为时钟的标准。针对事件时间的10分钟,是指的传感器那一端:12:00 - 12:10 之间产生的所有数据。此时的10分钟不再和Flink计算引擎所在的操作系统时间有关。而且,Eventime对事件延迟到达非常有用。

这种体验是很好的,如果你去按摩,预约是凌晨2点,但是因为在公司加班,晚到了5分钟。体贴的技师能够等你一会,等到开始给你按摩了,正式上钟打开,然后按照标准的时间给你服务,最后下钟。请问你是选Process Time还是Eventtime?

FjUnMvF.jpg!mobile

3

Ingestion Time

摄入时间是事件进入Flink流处理系统的时间。它介于Event time和Processing Time中间。进入到Flink中的事件都以进入到source operator系统给的时间为准,后续的时间操作也会基于这个时间。与Processing Time对比,它可以确保更稳定的计算结果。与Event time相比,它没法处理延迟数据。在内部看来,它和Event time非常类似,只不过事件的时间戳不是从事件自身提取的,而是根据source operator上自动分配的。

jqIv2m7.png!mobile

还是之前的例子,你还是去按摩,预约时间还是从凌晨2点开始,还是因为公司加班,晚到了5分钟。但现在的方式是你进到按摩师店,技师就开始打卡上钟。但你突然觉得肚子不舒服,要去厕所拉个屎,等你回来的时候,发现已经10分钟过去了。其实,技师的服务时间根本不是你真正开始享受服务的时间!差评!

yyINRfu.jpg!mobile

2

Watermark(水印)

1

Watermark与时钟

通过前面的内容,我们知道了Flink中有三种时间语义。Processing Time使用的是系统时间,系统时间是不断变化的。如果使用Processing Time,流处理应用是有一个不断流逝的时钟。但如果是Event time和Ingestion Time呢?使用了这两种时间,就表示不再使用Linux系统上的时钟了。一下没有了时钟,这事大了,难道时光不再流逝?这可不行,时光不流逝就表示我们后续无法针对时间做任何计算。所以,流式系统中时光必须流逝,就必须要有时钟。

我们所要开始聊的Watermark,就是要让流式处理应用能够拥有时间流逝的机制,就是要为Flink应用构建一个时钟。Watermark是流的一部分,是一种特殊的控制事件。它其实是Flink在使用Eventtime(Ingestion Time一样)时,提供的一个逻辑时钟(之前的Processing Time是物理时钟)。

明确两点:

  • 水印是要构建一个逻辑时钟,既然是时钟是一定不会回退的,一定是前进的

  • 水印和时间戳息息相关,基于它Operator才会有流逝、不断推进的时钟。

IbaEve7.png!mobile

大家注意我画的图。可以清晰地看到水印是流中的一部分。而且水印一定是随着时间流逝的。水印中有一个数字,这就是时间戳。通过水印,就构建起了Flink的逻辑时钟。让我们后续做Window要取指定事件时间范围的数据成为可能。

2

在流中分配Watermark

Watermark由我们手动分配到流中,一般是在source之后就生成。

val text = senv.socketTextStream("localhost", 9999)
.assignTimestampsAndWatermarks(new TimestampExtractor)

上面的代码,就是给流中分配水印。

跟一下源码,我们可以看到是一个名为TimeStampsAndWatermarksOperator在处理,它就是根据传入的WatermarkStrategy类中,提取事件时间,然后生成Watermarker,并发送到流中。

aeEFZrb.png!mobile

3

并行执行下的Watermark

了解了Watermark,接下来我们需要进入到分布式环境中。我们知道每一个Task都有可能运行在不同的服务器中,所以,每个Task都必须有一个属于自己的时钟(忘记Linux系统时钟吧)。大家看一下Flink下面这张经典的图。

Uz2i6fR.png!mobile

我们来解读一下,这个部分非常关键,是理解Watermark的核心。

  • 图中,并行的任务是独立生成Watermark的。

    所以,不存在Watermark同步一说。

    不同的Task Watermark是不同的。

  • 假设在Source Task上定义了Watermark,那么该Task就是以Watermark作为它的Event time。

  • 水印是不断在整个任务图中流动的。

    每当Task接收到一个Watermark,都会推进它的时钟(Event time),并且这个Task会继续向下游生成一个新的水印。

  • 当一个Task消费多个并行输入的数据时,每个输入中都有Watermark,那Task到底以谁为准呢?

    Flink中定义当使用join或者window、keyBy这一类操作,取输入中事件的Watermark时间最小值。

  • 所有的Task会随着Source Task不断发出的Watermark以Event time方式来更新自己的时钟

4

数据延迟问题

如果某个Task的时间被一个Watermark更新为W(12),但后续又一些Event time为8、9、10的事件到达Task,这种情况是有可能出现的。而且,在一些业务中,还有可能事件会无限延迟。我们当然是可以让Watermark延迟更长时间,但这肯定会导致计算延迟,这不是我们希望看到的。

所以,我们需要明确最大能够延迟的时间,超过这个时间,一概不接受。没有人愿意无限制地等待。

IDLE Source(空闲的Source)

Uz2i6fR.png!mobile

还拿这张图来说,假设第二个Source Task一直没有接收到事件,如果我们是基于事件时间来生成水印的话,就会导致流中空空如也,这个Source Task一些下游Task好像也静止了。这将会导致Window无法触发计算,因为Window Task需要在不同的并行输入Watermark中取最小值。这是我们不希望看到的,这也是有一些程序员发现自己写好的Flink程序明明发出去数据,但始终得不到计算结果的原因。

Flink也提供了一种非常方便的方式来解决该问题:

WatermarkStrategy.withIdleness(Duration.ofMinutes(1));

看一下源码:

VryaMnA.png!mobile

核心就是这个WatermarksWithIdleness类,它创建一个定时器。

nQZNB3e.png!mobile

如果检测到超过指定的空闲时间,就会将输出流标记为IDLE

qeQ7B3B.png!mobile

当输出流标记为IDLE后,下游Task将不再等待输出的watermark了。

Qnuqquu.png!mobile

#  总结:

Processing Time、Event time、Ingestion Time的不同点?

Flink的一些Operator例如:window是需要按照时间来触发计算和计算大小的。而在Flink流处理应用中,不同的时间有不同的时间语义。Processing Time表示使用Task运行的Linux系统时间、Event time表示使用事件时间、Ingestion time表示摄入时间。绝大多数场景都是使用Event time。

请解释下Watermark。

Watermark是数据流中的一部分。我们必须要让每个Operator有自己的时钟,而因为当前使用的语义是Event time,无法使用系统时钟,所以需要我们需要一种机制,来构建每个Operator自己的时钟。而Watermark就是这样一种机制,通过Watermark可以构建每个Operator的逻辑时钟。

如何解决延迟数据问题?

将Watermark设置为当前Event time – 允许延迟的最大时间,但注意Watermark的时间是不能往前的,必须前进。但不是无限的延迟,需要根据业务控制允许延迟最大的时间,因为它同时也会延迟计算时间。

在Flink中如果Source并行度大于1,Watermark是一致的吗?

不一致,每个Source Operator的Task都会独立生成Watermark,每一个Operator的Task都有自己的时钟。而时钟的推进就是基于Watermark来实现的。

如果一个Task会从多个输入消费数据,因为涉及到多个数据流入,如何确定它的时间呢?

多个流取最小值。

如果某个Source的Task,一直闲置,而另外一个Task是运行状态,会出现什么样的问题?如何解决?

会出现Windows窗口无法实现计算,因为Windows所在的Operator Task会一致等待空间的Source Task,然后取最小值。只需要在Watermark策略中指定IDLE的超时时间即可。一旦Source Task达到超时时间,会标记为IDLE,下游就不再等待上游的Watermark了。

AbuMfeA.jpg!mobile

RnMbmia.jpg!mobile


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK