4

PySparkStreaming At Least Once与不停机更新实现

 3 years ago
source link: https://studygolang.com/articles/33225
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

1.实现在流式数据处理时,做到At-Least-Once的能力,即保证数据不丢失,保证每条数据“ 至少被计算1次 ”。

2.实现Streaming任务在需要更新逻辑代码时,做到不停机滚动更新。

实现关键点

1.从上一次消费的Offset继续消费

2.Spark Streaming Gracefully Restart(优雅退出)

3.数据处理时增加重试,每个步骤打点log便于对账

第三点属于业务自己维护,下面不展开赘述。

从上次消费的地方继续消费

我们在消费Kafka消息通道时,实质上是将我们“消费的进度”(offset)上报到消息通道。

IjmYJbV.png!mobile

可见生产者会有一个offset,不同的consumer group也会有不同的offset,那么我们在消费的过程中,通过上报消费进度,并且下一次继续从这个offset消费,就实现了数据源的at-least-once。

这里为什么是at-least-once而不是exactly-once呢?

因为例如kafka默认的offset上报模式是自动上报,即隔几秒钟上报一次,那么我们可能在这几秒钟内,程序异常退出,offset没有得到上报,下次可能会消费一些重复数据。

Kafka支持程序侧自己维护offset上报时机,可以定制化的向exactly-once靠拢。

Gracefully Restart

优雅退出意为收到退出程序信号时,例如SIGTERM、SIGINT,程序处理完还没有处理完的数据,回收释放相应资源,再进行退出,而不是立刻退出(会丢失部分正在处理的数据)。

spark本身提供了

spark.streaming.stopGracefullyOnShutdown=true

参数配置,顾名思义,配置为true时,似乎就可以实现优雅退出了。

但是在google这条配置的过程中,逐渐发现了一些问题。

大致就是在Hadoop 2.8 + YARN的环境下(我们的集群Hadoop版本也是2.8,据说Hadoop3.x已经修复了这个问题),YARN对Spark实例发起SIGTERM之后,会有一个固定的,比较短的“等待时间”,时间过后立刻Kill实例,并不等待执行完成。

相关链接:

https://issues.apache.org/jira/browse/SPARK-25020

https://blog.csdn.net/zwgdft/article/details/85849153

这样一来,单纯配置参数并不能保证优雅退出。

我们的做法

既然无法通过参数保证,我们可以让Spark实例在合适的时机自己退出,而不是通过YARN去Kill。

StreamingContext有一个方法:

6RBbQzQ.png!mobile

streamingContext stop 方法

方法注释很明确,可以等待收到的数据全部处理完毕后在退出~,这样就符合我们的意愿了。

做法也比较简单,在PySpark实例启动时,单独启动一个thread,无限循环去监听某个标记位(可以是redisKey,HDFS Directory,或者甚至是在线配置API等等...),当标记位表明程序要退出了,我们调用stop方法,即可完成优雅退出。

jeaIN32.png!mobile

开启线程监听

UFzeiiJ.png!mobile

监听并且当标记位消失,主动停止application

streaming任务不停机更新

有时我们的实时流处理任务因需求、bug等原因,不得不进行代码逻辑的修改。

但spark任务需要提交到YARN集群上执行,并且默认不提供代码热更新,热重启的能力。

此时如果我们只维护一个Spark实例,更新代码时一定需要经历停止实例,更新代码,启动新实例这几个步骤;

这样一来,停机更新代码这段时间的用户数据,肯定无法被及时消费处理,并且 从spark程序提交,到YARN分配资源开始执行,并不是瞬间完成的 ,这段间隔时间,线上用户会感到困惑,认为业务有问题,进而产生投诉的情况。

在我们看过的Golang程序中,都有热重启的功能支持,举例:

https://cloud.tencent.com/developer/article/1681685

其核心思想就是 Fork-And-Execute ,也就是在收到更新信号时,fork一个相同子进程,替换新的二进制文件,然后等待子进程处理结束,然后启动新的二进制文件,退出旧的程序。

我们的做法

使用两个application实例,一个作为Master主要使用,一个作为Replica备用

FRjiYzY.png!mobile

发布过程

1.Replica实例更新代码

2.启动Replica实例,与Master并行跑

3.优雅停止Master实例

4.Master实例更新代码,启动Master实例,与Replica并行跑

5.优雅停止Replica实例

有疑问加站长微信联系(非本文作者)

eUjI7rn.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK