119

谈谈 spark 的黑名单设计

 6 years ago
source link: http://mp.weixin.qq.com/s/RTfsdNbB3oNtKdM35HQQ3Q
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

谈谈 spark 的黑名单设计

Original 孙彪彪 一生数据人 2017-12-18 03:10 Posted on

首发个人公众号  spark技术分享 ,  同步个人网站  coolplayer.net ,未经本人同意,禁止一切转载

最近碰到了几次 spark job 因为磁盘损坏挂掉的情况, 想象一下,你的 spark streaming job是一个长时间运行的job, 磁盘坏掉也是常有的事情,怎么能应对这种情况也就成了很重要的事情,spark 官方倒是给出了一种解决方案,我们来看下一些实现细节

spark 2.2 中提供的解决方案,主要的使用场景就是:

假如有个节点的磁盘由于某种原因间歇式故障导致没法写入内容,也没法从扇区上读取内容,如果我们正在运行的task 的数据需要从这个磁盘上load, 那么这个 task 就会失败, driver 上的 scheduler 再次调度这个 task 的时候,会因为数据本地性的原因,再次调度到 这个坏盘的node上,这样task就会不断的失败,最后导致整个spark应用挂掉。下面我们整体介绍下spark 黑名单机制的设计细节

官方的几个设计目标

  • 支持 task,stage或者application 级别在一个 executor 或者 整个 nodes失败多少次就屏蔽掉这个 executor 或者nodes

  • 避免出错引起的调度延迟 随着 numPendingTasks 线性增大

  • 被 Blacklisted 的资源可以在一定时间后恢复使用

  • 简单配置,简单实用

  • 维护黑名单使用的内存不能无限增长

  • 用户可以定义黑名单限制的严格程度

整个设计方案是这样,

首先需要维护一些数据结构,

  • (executor, task)

  • (node, task)

  • (executor, stage)

  • (node, stage)

  • Executor

这些数据结构就是为了存储失败次数的,比如可以使用 (executor, task) 记录每个 task 在 每个 executor 当前fail 几次了。

黑名单限制可以分为 task 级别,stage 级别 和 整个应用级别,

  • task 级别说来简单,就是一个 task 在同一个 executor 只能失败 spark.blacklist.task.maxFailuresPerExecutor (默认为1)次, 然后这个 executor 对于这个 task 就被加入 黑名单了,  同理最多可以在一个node上失败  spark.blacklist.task.maxFailuresPerNode (默认为2) 次

  • stage 级别, stage 里面所有的 task 在同一个  execuotr 最多可以失败 spark.blacklist.stage.maxFailedTasksPerExecutor (默认为2)次, 这个 executor 对整个 stage 来讲就加进黑名单了,  spark.blacklist.stage.maxFailedExecutorsPerNode 依次类推

  • application 级别, spark.blacklist.application.maxFailedTasksPerExecutor (默认为2) ,  spark.blacklist.application.maxFailedExecutorsPerNode  (默认为2)这两个参数就不用解释了吧,当然如果你是个长时运行的应用,  被加入黑名单的资源过一个小时是要再被从头使用的,不然所有资源都会慢慢的全部被屏蔽掉。

  • 注意  shuffle fetch failure 不会被算进 fail次数

  • 给 executor 和 node 重新做人的时间可以使用 spark.blacklist.timeout 定义, 过了这个时间,被屏蔽的 executor 和node就可以重新使用了。

黑名单机制可以发挥作用的场景

  • 大的集群(代表 executor 数目比较多), 大的 stage(代表 task 也比较多), 这个时候有一个坏盘, 如果你的集群很大,机器很多,经常性的坏几个盘是最正常不过的事情了,这也是设计黑名单机制主要来解决的问题。当一个 task 失败了之后,因为数据本地性的原因, 有很大概率下次运行还会调度到同一个 executor 或者 node 上, 这样下次还是失败,所以在 整个job失败之前,赶紧要屏蔽这个坏盘的 executor 和 node。但是存在几个问题,就是你的 task 在调度执行的时候会因为出错重试有导致  O(numPendingTasks * numBadExecutors) 时间的失败后再次调度运行延迟。

  • 之前说的场景是  task 个数远大于 cores 个数, 可以让黑名单机制很快触发,但是还有一种场景是我们的 task 个数小于  cores 个数, 这样的话, 在bad executor 上运行同一个 task 或者同一个 stage 的几率就比较小,难以触发黑名单进行屏蔽,甚至一直屏蔽不了,然后慢慢的出错,浪费时间,这种情况下,就要考虑把所有的 fail 一直积累下来进行屏蔽,但是这样会使用很多内存,我们要想办法解决这个问题

  • 还有一直场景就是就是长时任务, 占用了很大的集群资源,还在不断的接受用户提交的任务(比如通过akka 提交到 Job Server),上面跑的一些 job 也是各不相同,有的时间长,有的时间短,有的比较稳定,还有的是demo级别的测试job, 这种job会因为代码问题抛出异常也会造成 fail, 当然还有就是因为坏盘问题,导致正常执行的job 也会fail, 还有就是有可能有短暂的问题,比如因为gc引起的超时问题, 这种就很难把整个 应用生命周期的 所有 task 的所有错误都记录下来, 因为我们没法在 driver上消耗那么多内存。

  • 有一种情况是有些  bad nodes 导致 Shuffle-Fetch 的时候拉取不到数据, 注意,这里拉取不到数据导致的失败有可能是因为 上一个 stage 的 task shuffle write 写数据有问题导致的, 这种情形 spark 现有的机制会标注 shuffle read 的数据 missing, 然后根据依赖链, 重新计算 miss data,所以问题的源头是上个 stage 的 task 运行的 executor 或者代码有问题, 而不是 进行 shuffle read 的这个 task 和 executor 有问题, 这种情况不应该把 shuffle fetch fail作为黑名单屏蔽的依据。

  • 还有一个问题是如果你开启了 spark 动态资源分配机制,就会出现这样一种情况,就是因为坏盘导致 一个executor 被屏蔽了,动态资源分配机制就会 根据这个 executor idle 的时间(默认 60s),把这个executor 给remove掉, 然后资源不够用的时候,再分配新的executor, 又在这台坏盘的node 上启动了新的 executor, 然后调度的 task 还是会失败, 对于这种情况,就应该把坏盘的 executor 所在的node 一直屏蔽着, 直到屏蔽时间超时 node 重新利用。

  • 还有一种情况是这样, 有可能是因为你的 应用代码的原因 导致 task 在 executor 上 产生oom, 这种情况不是 executor 本身的问题,把 execuotr 放到黑名单里面到还好, 因为 executor 总会死掉的,但是不能因为这种原因把整个 node 都屏蔽掉, 还有就是有可能 task 会因为gc 到原因导致 超时产生失败,有时候甚至没有 gc, 而是因为executor 上跑的别的task 导致的超时问题, 这些原因统统不应该归结为 executor 的原因, 不应该把 executor 给屏蔽掉。

黑名单最佳实践

当你选择配置spark黑名单机制的时候,你就要面临一个选择,
a 要么就是在最短时间内把坏老鼠加到黑名单中,减少task 失败的次数
b 要么是追求对资源的使用率(假设fail都是短暂因素引起的而且很快就恢复)。

当你追求a的时候,    一个 task 在一个 executor 上失败的时候,就可能调度到同一个node上的其他 executor 上。所以在同一个 nodes上,一个 task 最大的失败次数 maxNodeAttempts 的计算公式是  min(
spark.blacklist.task.maxTaskAttemptsPerNode , num executors per node)

如果一个node的磁盘坏掉了,你想 你的 spark job 不至于挂掉,需要配置
spark.task.maxFailures > maxNodeAttempts

如果N个node 的磁盘坏掉了

spark.task.maxFailures > N * maxNodeAttempts

推荐的配置

spark.blacklist.recoveryTime = 1 hour (or more)
spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
spark.blacklist.task.maxTaskAttemptsPerNode = 2
spark.blacklist.stage.maxFailedTasksPerExecutor = 2
spark.blacklist.stage.maxFailedExecutorsPerNode = 2
spark.blacklist.application.maxFailedTasksPerExecutor = 2
spark.blacklist.application.maxFailedExecutorsPerNode = 2

这种配置的坏处就是,你的task失败后,为了降低再次失败的概率,资源会可能长时间的被屏蔽掉,如果这个fail是短暂因素引起的,你就白白浪费了资源

如果你追求的是b

那么推荐的配置是

spark.blacklist.recoveryTime = 5 min
spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
spark.blacklist.task.maxTaskAttemptsPerNode = 2
spark.blacklist.stage.maxFailedTasksPerExecutor = 5
spark.blacklist.stage.maxFailedExecutorsPerNode = 4
spark.blacklist.application.maxFailedTasksPerExecutor = 5
spark.blacklist.application.maxFailedExecutorsPerNode = 4

可以看到,上面配置的资源损坏后恢复再次利用的时间比较小,这种配置追求的就是尽可能短时间地把资源放在黑名单中, 而且会尽可能快的把资源从黑名单中recovery出来。坏处就是最增大task 失败的次数,有可能导致你的job挂掉,而且会带来  O(numPendingTasks) 调度时间延迟。

所有的配置

  • spark.task.maxFailures 整个 job 最大的task 失败次数

  • spark.blacklist.task.maxTaskAttemptsPerExecutor — for a given task, how many times the
    task can fail on a given executor, before the executor is blacklisted for that task.

  • spark.blacklist.task.maxTaskAttemptsPerNode — for a given task, how many times the task
    can fail on a given node, before the node is blacklisted for that task.

  • spark.blacklist.stage.maxFailedTasksPerExecutor — how many different tasks must fail on
    one executor, within one stage, before the executor is blacklisted for that stage

  • spark.blacklist.stage.maxFailedExecutorsPerNode — how many different executors are
    marked as failed for a given stage, before the entire node is marked as failed for the stage.

  • spark.blacklist.application.maxFailedTasksPerExecutor — how many different tasks must fail
    on one executor, in successful task sets, before the executor is blacklisted for the entire
    application.

  • spark.blacklist.application.maxFailedExecutorsPerNode — how many different executors
    must be blacklisted for the entire application, before the node is blacklisted for the entire
    application.

  • spark.blacklist.timeout — how long a node or executor is blacklisted for the entire application,
    before it is unconditionally removed from the blacklist to attempt running new tasks. This is
    also used for how long to remember individual task failures, to avoid blacklisting resources in
    an application due to failures that are very far apart

欢迎关注 spark技术分享 

                                

Image

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK