115

Spark 的 Structured Streaming是如何搞定乱序的事件时间的

 6 years ago
source link: http://mp.weixin.qq.com/s/ijI2mbEdccQ7IxobJuqkwg
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Spark 的 Structured Streaming是如何搞定乱序的事件时间的

Original 孙彪彪 张江打工人 2017-10-19 14:43 Posted on

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

我们本文主要讲述 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及如何处理乱序到达的事件日志。

我之前阐述过类似的话题,参考这篇之前写 文章

,今天具体聊下 Spark’s Structured Streaming  中是如何按照事件时间进行聚合, 以及 Watermarking 概念。

在讨论解决消息乱序问题之前,需先定义时间和顺序。在流处理中,时间的概念有两个:

  • Event time :Event time是事件发生的时间,经常以时间戳表示,并和数据一起发送。带时间戳的数据流有,Web服务日志、监控agent的日志、移动端日志等;

  • Processing time  :Processing time是处理事件数据的服务器时间,一般是运行流处理应用的服务器时钟。

我们看下下面这个示意图

Image

上图中 time1,time2, time3等是我们 Spark straming 拿到消息将要处理的时间, 图中方块中的数字代表这个event 产生的时间, 有可能因为网络抖动导致部分机器上的日志收集产生了延迟, 在time3的batch中包含event time 为2的日志, 特别说明一下, kafka 中的不同分区的消息也是没有顺序的。

在实时处理过程中也就产生了两个问题:

  • Spark streaming 从Kafka 中拉取到的一批数据,我们可能认为里面包含多个时间区间的数据

  • 同一个时间的数据可能出现在多个 batch 中

持续增量聚合

我们来看下在 Structured Streaming  中我们是如何处理上述棘手的问题的, Structured Streaming  可以在实时数据上进行 sql 查询聚合,就跟离线数据查询聚合类似,  Spark SQL 可以不断地在实时数据上增量聚合, 如果你想看下一个设备的信号量的平均大小, 你可能会写以下代码:

# DataFrame w/ schema [eventTime: timestamp, deviceId: string, signal: bigint]
eventsDF = ... 

avgSignalDF = eventsDF.groupBy("deviceId").avg("signal")

我们本文描述的情况不同的是, 这里得到的结果是不断随着新数据的到来动态更新的。 你可以使用不同的模式把结果落地到外部存储上, 参考这里 http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#starting-streaming-queries

当然这里使用的聚合函数你也可以自定义,参考 https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

在事件时间上做聚合

有时候我们不想在整个数据流上做聚合, 而是想在时间窗口上做聚合(每过去5分钟或者每过去一小时),如果是上个例子,那么现在我们想看下每过去5分钟的一个平均信号量,是否有异常,注意我们这里的时间是日志产生的时间,而不是日志达到处理系统的时间, 这个需求是不是很变态, 但是  Spark’s Structured Streaming 依然可以满足你,这就是我那么喜欢 spark 的原因。

之前 spark streaming 里面很难做到, 可能是设计 spark streaming 的时候,还没有出现这么变态的需求,不过, 在 Structured Streaming 里面,你只要使用 window()函数就解决这个问题了,我们要每5分钟统计过去5分钟内的所有设备产生日志的条数,要按照事件产生的时间聚合哟,代码这样写:

from pyspark.sql.functions import *

windowedAvgSignalDF = \
  eventsDF \
    .groupBy(window("eventTime", "5 minute")) \
    .count()

是不是很简单,我贴个图,让你更清楚地看到过程

Image

是不是很清晰明白。

当然如果我们可以给 spark 提一些更加变态的需求,我想要 每5分钟统计过去10分钟内的所有设备产生日志的条数,也是要按照事件产生的时间聚合哟, 这个时间就重叠了,代码这样写:

from pyspark.sql.functions import *

windowedAvgSignalDF = \
  eventsDF \
    .groupBy(window("eventTime", "10 minutes", "5 minutes")) \
    .count()

轻松搞定,处理过程如下:

Image

这里有人就有疑问了, 如果一条日志因为网络原因迟到了怎么办, Structured Streaming  还是会很耿直地把这条日志统计到属于它的分组里面。

Image

每5分钟统计过去10分钟内每个设备(注意这样按照 设备 和 重叠时间窗口同时分组)产生日志的条数, 如果一条日志因为网络原因迟到了,还是会把这条日志统计到属于它的分组里面。

有状态的增量查询

如果你想知道上述这么强大的功能是怎么实现的,我这里就掰扯掰扯,其实就是  Spark SQL 在内部维护了一个高容错的中间状态存储, 存储结构就是个 key-value 对, key 就对应分组, value 就对应每次增量统计后的一个聚合结果, 当然每次增量统计,就对应  key-value 的一个新的版本, 状态就从旧的状态到了新的状态, 所以才认为是有状态的。

我们都知道,有状态的数据只存在内存中就是靠不住的,老办法, 还是要使用 WAL(write ahead logs, 就是记录下每次增量更新操作的日志) 的老办法,  然后间断的进行 checkpoint , 如果某个时间点系统 挂了, 就从 checkpoint 的点开始使用 WAL 进行重放(replay)

这套东西在数据库中都被玩了几千几万遍了,Structured Streaming 在这里又玩了一遍,就是为了满足你们要的 exactly-once 语义保证。

原理图参考下面:

Image

当然你会发现, 因为我们的数据源是一个 流, 也就是这个数据是无限的, 如果我们要保存无限的数据的状态,肯定是不行的,为了资源, 也为了性能, 你必须要做权衡了, 这里的权衡就是你要做出决定,落后多久以后的数据即使来了,我们也不更新老 key-value 的状态了, 比如每个 设备在 2018年1月1号1点1分1秒  的时间点发来了一条  2017年1月1号1点1分1秒 的日志,我们选择忽略。

watermarking 是个什么玩意

我们上文提到我们会为做成决定,落后多久以后的数据即使来了, 我们也不要了, 这个 watermarking 概念就是来定义这个等待的时间, 举个例子, 我们如果定义最大的延迟时间是 10 分钟, 这就意味着 事件时间 落后当前时间 10分钟内的 日志会被拿来统计指标, 如果再迟了就不管了。 假如 现在 12:33, 如果某条日志的时间时间是 12:23, 我们就直接drop 掉,那么 12:23 之前的key-value对的状态就静止不动了,也就可以不用维护状态了, 别看这个 watermarking 看起来很高大上,其实就是这么回事。

代码也很简单,可以这样写:

windowedCountsDF = \
  eventsDF \
    .withWatermark("eventTime", "10 minutes") \
    .groupBy(
      "deviceId",
      window("eventTime", "10 minutes", "5 minutes")) \
    .count()

Spark SQL  内部会自动跟踪这个最大 可见的 事件时间,   用来drop 掉过时的日志 和 静止不动的状态。

原理见下图

Image

我们可以看到, x 轴是处理集群处理的时间,  y轴 代码 事件的时间,然后有一条动态的水位线, 如果在水位下面的日志,我们就不要了,是不是很简单。

欢迎关注 spark技术分享     

                           

Image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK