[RemoteShuffleService]: RSS实现情况
该文档写于2021年4月, 当时按照设计文档实现了RSS的原型(基于Uber的开源RSS), 但是后续应该人事调整, 我开始从事OLAP工作后,就不在开发RSS了.
Spark Shuffle任务的现状阐述
目前公司内部有最大集群规模已经达到8000多台机器, 总共有近45万核, 1.23PB的内存, 其中每天有10万+个Spark的离线任务在运行.
目前团队正在将HiveSQL的任务都迁移到Spark引擎之上, 提升整体性能和资源利用率, Spark将承接越来越多的业务压力.
但随着Spark应用越来越多, 任务的稳定性受到了越来越多的挑战, 尤其是Spark Shuffle这块的问题, 一旦数据量超过一定阈值, 就会出现大量的Shuffle Fetch失败的情况, 由于Spark做了很好的重试机制, 因此有时候依然能跑出来结果, 但大量计算资源浪费在重试步骤之中; 而有时候因为失败太多, 导致整个App失败退出.
举个真实的案例, 下图是某业务运行的一个UI监控图, 从图中可以看到, 在Failed Stages里面出现了大量的FetchFailedException
, 整个程序在不停的Stage重试, 而最后这个case由于失败过多整个APP失败了
在上图中的19个Failed Stages都是由于
FetchFailedException
造成的
Spark Shuffle任务的瓶颈分析
监控发现
在分析这类问题的时候, 我们发现了经典的”三高”现象, 即网络,磁盘, CPU都很高.
如上图所示, 一般出现FetchFailedException
时, 此时机器上的监控一般会出现这三者的峰值.
磁盘和网络的峰值一般一起出现, 因为Shuffle时会有大量的数据交换
CPU出现峰值时有两种情况, 一种Wait
比较高(图中黄色部分), 说明CPU忙于处理IO操作,一种Wait
不高, 但总体CPU比较高, 这种情况往往是该节点有其他任务运行导致的, 进程之间没做到CPU隔离 或者一些热点计算问题.
Shuffle原理
从监控看到了, Shuffle过程是个”三高”过程, 同时是IO和CPU密集型的过程, 这一节从Shuffle的原理上, 解释一下为什么Shuffle过程为什么会出现瓶颈.
Shuffle本质是一种数据交换机制, 在MR模型中数据是按照(key, value)的元组方式组织的, 经过MapTask之后, Key的数值已经变化了, Shuffle的目标将这些数据重新组合, 相同新Key的数据放到一起处理.
因此, 如果所示, MapTask按照新Key产生了3个不同的数据块, 每个数据块与Reduce的个数对应, ReduceTask则直接去获取对应的数据块(图中用颜色表示)
在Spark的实现之中, MapTask将这些数据重排序后, 写入到同一个文件之中, 通过元数据标识数据所在offset和length. 例如在文件的(offset=0, length=512)的数据为P0, (offset=512, length=1024)的数据为P1. Map阶段要写完所有数据, 磁盘吞吐非常大.
ReduceTask拿数据的时候, 要获取所有MapTask的输出, 所以必须跟所有MapTask文件所在Server产生网络连接, 网络连接会非常多. 需要传输所有的MapTask输出到其他节点, 那么网络吞吐也会非常大. 其次, 我们拿P1数据的时候, 是直接读取文件的中间位置的, 因此随机IO会非常多, 磁盘繁忙. 最后为了减少网络和磁盘的压力, Spark加入了Shuffle过程的Merge和Shuffle数据压缩能力, 这就加重CPU的压力, 但一般情况下CPU压力没那么大, 除非被别的任务影响
如果有兴趣了解更加详细的Shuffle原理的同学, 可以参考这篇文章
原因归纳
综上所述, 我们简单归纳原因:
- 大量网络连接, 且非常多的网络小包
- 磁盘大吞吐, 且随机读取, 触发IO瓶颈
- 计算存储耦合, CPU密集型任务影响Shuffle
目前的解决方式
针对以上的原因, 目前团队也有一些处理措施:
- 增加超时配置项, 缓解网络问题
- 增加Yarn Shuffle Service处理IO的线程个数,缓解IO问题
- 隔离集群, 给队列打上独立NodeLable, 去除其他进程影响
以上措施在一定程度上缓解了FetchFailedException
问题, 但这些措施都治标不治本:
- 增加网络超时, 会加大重试的时间, 导致异常情况下时间更长
- Yarn节点的IO已经达到硬件上线, 无法再通过多线程加速
- 隔离集群, 使得集群资源的使用率低, 平均CPU使用率在30%以下
但随着业务数据量越来越多, 该问题又开始出现了, 是时候提出一种根治的方案了
Spark Shuffle任务的改进方案
方案设计
这个章节, 主要针对上文提到的”三高”问题, 看看从设计上如何解决这些问题
网络问题
在原有的网络模型之中, 网络连接个数为M*R, 而M和R都在Executor之中, 可以复用网络连接, 实际的连接为E^2. 如果executor个数为2000个, 那么会有4百万的连接, 一些排在后面网络连接经常出现超时.
为了性能, Spark在Reduce启动的时候, 会尽可能的建立所有的链接
另外一个问题是网络传输的时候小包太多了, 一个分区的平均大小<1MB, 大量的时间消耗在网络连接的创建过程之中, 能不能提前聚合数据, Reduce启动的时候, 直接读取数据就行了.
对于这两个问题, Push-Shuffle可以很好的解决这两个问题
该方案数据的流程, 并不是由ReduceTask主动Pull, 而是由MapTask主动Push到ShuffleService里.
如图所示, 2个MapTask计算完分区数据P0后, 将两者的P0都push到第一个Shuffle Service.
ReduceTask启动后, 直接获取第一个Shuffle Service之中P0数据即可.
回看原来的问题, 该方案的连接个数为(E*S + E), S为RSS的个数, 一般在10-50之间, 远远小于E的个数, 有效减少连接;ShuffleService会合并数据(只要读取的时候, 多读几个块就好), 可以方便的处理小包问题.
发送时的小包, 可以通过同步发送P0和P1到同一个节点来解决
此外Push-Shuffle将随机IO变成了顺序IO, 解决了随机IO问题.
磁盘问题
磁盘问题一个在于随机读写, Push-Shuffle将Reduce阶段的随机读写变成顺序读写
另外一个问题就是读写速度, 我们的解决思路就是使用SSD替换HDD, 同时SSD对于随机读写也非常友好, 因此硬件加速完全好于预期.
那么用Spark原来的External Shuffle Service是否也能上SSD解决这个问题呢?
答案是否定的, 原因在于ESS是和Yarn强制绑定的, 代码逻辑在NodeManager的进程之中, 替换全部Yarn集群的磁盘, 成本上根本无法接受. 因此RSS必须是存储计算分离的, 也就是需要是个完全独立服务.
总结来说, 解决磁盘问题方法是: Push-Shuffle, SSD加速 和 独立服务
CPU问题
如<瓶颈分析>中提到的, CPU问题并不是Shuffle的核心问题, 主要问题在于热点和隔离.
为了解决热点问题, 我们引入了Shuffle 调度器, 通过监控App里面磁盘网络内存压力, 防止出现单服务器热点, 拖慢计算或者导致失败.
对于隔离问题, 我们采用独立部署, 解除对Yarn集群的依赖
Remote Shuffle Service方案
由上文的铺垫, 整体架构也呼之欲出了, 整体采用典型的Master-Slave模式, 其中Client为Spark App, Master为Remote Shuffle Service Manager(RssManager), Slave为Remote Shuffle Service Server(RssServer), 还有DFS作为数据备份, Zookeeper作为元数据存储.
Remote Shuffle Service Manager
作为整个集群的控制节点, 负责整个集群与客户端的任务调度, 元数据管理等主要工作.
- Manager会采用主备方式完成服务高可用, 主备实例会通过Zookeeper进行选主工作
- Manager会采取无状态应用模式, 整体状态都会同步到Zookeeper之中, Slave可以在重启时重建全部的状态
- 为了加速备实例的启动速度, 主备实例会保持心跳, 并定时同步全量或增量状态给备节点
- Manager与Server会保持心跳状态, Server通过心跳上报资源容量信息.
- Manager会主动想Server下发RPC通信
- 客户端会与Manager建立连接发送RPC请求, 但是Manager不会主动给客户端发送RPC请求
- Manager与资源调取器(Yarn/K8s)保持连接, 查询任务状态, 用以退出清理
- Manager内部设有调度器组件, 可以插件式的设置调度策略
- Manager内部有元数据管理, 主要是App级别的信息, Task级别的信息由Driver持有, 通过RPC请求送达Manager
- Manager服务有WebUI接口, 统计App级别监控信息
- Manager服务有jmx接口, 指标方便指标统计
Remote Shuffle Service Server
作为整个集群的数据节点, 负责与客户端Spark App进行数据交互
- Server是数据节点, 只会跟客户端进行数据通讯, 不参与控制通信
- Server与Manager会保持心跳, 同时会与Manager有控制流的RPC请求
- Server会将数据首先写入到本地的SSD, 如果开启了备份的话, 也会以pipeline的方式, 同步给其他备份节点, 最后开启了慢写入的话, 才会备份给DFS. 由于DFS一般有HDD磁盘组成, 因此性能会受到巨大的影响
- 默认情况下, 一个Reduce任务会开一个文件句柄, 用于写入Map的数据, 同时还有一个index文件记录数据块位置与TaskId的对应关系
- 读取数据时候, 会根据TaskId过滤无效的数据块
Spark App
作为整个集群的客户端, 负责实现Spark对外的接口, 并与RSS交互, 该模块除了实现SparkShuffleManager的接口之外, 还要实现Spark App的事件处理, 还需要在调度上处理任务失败的情况
- Shuffle接口: 实现Shuffle注册, 数据读取, 数据写入等工作
- Event接口: 实现整体控制流的处理, 例如App启停, Stage启停
- Schedule接口: 当任务失败时的, 需要重试, push Shuffle的重试方式不同, 目前Spark并没有有接口暴露, 可能会侵入式的修改源码
Zookeeper
作为整个集群的元数据中心, ZK承担着元数据备份, 服务发现, 主备切换等功能
- Manager会将App信息写入到ZK, 方便备实例恢复
- Manager通过ZK进行主备切换和监控
- Client会通过ZK的地址获取到Manager的地址
DFS
作为整个集群的数据备份中心, 目前公司内部只有HDFS作为DFS实现, 但是性能堪忧.
但当RSS集群也是以HDD磁盘为主时, Reduce任务直接获取DFS上的数据, 是个好的选择.
另外DFS方式的稳定性会更加好
方案效果
我们选了两个典型的案例:
业务1
使用默认的Shuffle方式, 总耗时7.3h, 有23个Failed Stage
,错误类型全是FetchFailed
使用RemoteShuffleService模式, 总耗时2.9小时, 无任何失败的Stage, 总Shuffle Write数据量在15TB左右, 最大ShuffleStage为5.6TB
业务2
使用默认的Shuffle方式, 总耗时1.6h, 有21个Failed Stage
使用RemoteShuffleService模式, 总耗时38分钟, 无任何失败的Stage, Shuffle Write数据量在8TB左右, 最大ShuffleStage为0.3TB
比较
业务 | 地图 | 服务分 |
---|---|---|
总Shuffle量 | 15TB | 8TB |
最大ShuffleStage | 5.6TB | 0.3TB |
默认Shuffle耗时 | 7.3小时 | 1.6小时 |
默认Shuffle失败Stage个数 | 23个 | 21个 |
RSS耗时 | 2.9小时 | 0.75小时 |
Spark Shuffle任务的未来展望
目前RSS属于刚起步阶段, 一期上线时(21年Q2)只会覆盖部分业务场景, 未来还需要在以下方向做深度的优化:
- 持续优化性能和功能, 减少小文件传输的等问题
- 完善服务的可运维性, 增加运维的页面, 提供各种监控指标
- 提高服务的鲁棒性, 能够处理各种服务异常, 在绝大多数场景下能够正常完成计算
- 提高服务可测试性, 完备各种测试用例, 防止异常bug对数据正确性的影响
- 智能决策Shuffle类型, Spark引擎能够自动决策Shuffle类型, 一旦RSS失败后能够回退到默认的Shuffle
- 优化Shuffle数据的调度策略, 解决Shuffle热点瓶颈问题, 优化集群整体使用率
- Cloud Native支持, 提供较好的弹性伸缩能力, 提高集群利用率
- 支持与大数据其他组件混部, 提高集群利用率