[RemoteShuffleService]: RSS设计文档
Shuffle Service设计文档
该文档写于2021年1月
项目背景
业务场景
目前公司内部有3个集群, 最大集群规模已经达到8000多台机器, 总共有近45万核, 1.23PB的内存.
待补完…
Spark业务负载
目前集群上, 每天有60万个离线任务在运行, 其中Spark任务有10万个. 公司未来整体战略会将所有HiveMR的任务替换Spark任务, 今年将整个数据平台上的SQL任务基本上都迁移到Spark上.
整体的资源利用率得到了巨大的提升, 但随着Spark应用越来越多, 任务的稳定性受到了越来越多的挑战, 尤其是Spark Shuffle这块的问题, 总是让运维人员头疼不已, 一旦当天数据量超过历史, 会导致任务失败, 就需要隔离机器重新运行, 等业务量降低时候, 再释放空闲资源, 给整个团队巨大的运维成本.
当前面临的问题
我们分析Spark Shuffle的问题, 发现目前Shuffle机制存在如下几个问题:
- 计算存储不分离, Spark计算和Shuffle的IO操作混在一起, 极易因为CPU过高导致Shuffle超时失败
- 机器超卖, 虽然提高了整体使用率, 但是会因为调度不均匀, 导致失败CPU飙升
- Yarn Shuffle Service内嵌于NodeManager之中, 无法给全部集群替换SSD加速IO效率
- Pull模式的Shuffle, Reduce阶段有大量的随机读取过程, 导致磁盘IO飙升
- Reduce阶段数据未提前聚会, 导致导致大量网络小包产生, 造成网络IO飙升
- 调度器无法识别CPU/IO繁忙状态, 导致任务依然下发到近乎满载的机器
业界的解决方式
随着Spark成为业界的批处理的标准, 在各大互联网公司之中, Spark Shuffle以上的问题都慢慢出现, 无法通过简单的调优解决这个问题也慢慢成为这个大家的共识. 因此各个公司都推出了自己的解决方案:
- Spark社区和LinkedIn: 领英在VLDB2020发了一篇Paper, 提出
Push based Shuffle
来解决4和5点, 该方案依然嵌入在原有的Spark框架之中, 因此该方案最容易被社区接受, 现在也慢慢的在合入代码 - Uber: 在今年的Spark Meetup里提出了他们的独立的Shuffle Service的解决方案, 也是目前唯一开源的独立Shuffle Service服务, 意图解决1-5点问题, 并引入SSD解决IO问题. 但目前开源出来的代码健壮性不强, Task重试的时候, 数据会出现冗余,还需要很长的优化之路
- Facebook: 也有名为cosico的独立Shuffle Service, 从架构图上看他们解决了4-6的问题, 但容错使用了DFS系统, Reduce数据也是从DFS直接获取的, 因此性能是打问号的, 且未公开源码, 并不确定具体如何实现
- 阿里云: 20年12月份, 阿里云EMR团队公布了他们的Shuffle Service的方案, 同样未开源, 从公开文章上看, 他们除了解决Shuffle稳定性问题(1-5点), 更重要的点是为了解决Spark on k8s在磁盘上性能解决方案. 同样阿里云的方案也未开源
- Intel: 作为一家硬件厂商, 推出了自己的Shuffle Service, 目的是为来卖RDMA产品, 暂时应该不会去采购硬件, 因此略过
设计目标及范围
简单调研几家产品之后, 我们打算博采众长, 并找到符合自己的场景的方案, 主要参考Uber和阿里云:
- 独立的Shuffle服务, 存储计算分离(解决1-2)
- SSD存储, 加速IO效率, 减少计算资源, 提高整体资源利用率(解决3)
- Push based Shuffle, Server端聚合后再Reduce(解决4-5)
相对于阿里云, 我们不需要做的:
- 不需要全部替换原有Shuffle Service, 能用Adaptive的方式, 决策使用哪种Shuffle模式
- 最好是CloudNative的方案, 但是不强制
而相对于他们两个的方案, 我需要:
- IO敏感型调度, 解决热点问题
Shuffle Service顶层设计
服务架构图
整体架构会采用典型的Master-Slave模式, 其中Client为Spark App, master为Remote Shuffle Service Manager(RssManager), Slave为Remote Shuffle Service Server(RssServer), 还有DFS作为数据备份, Zookeeper作为元数据存储.
Remote Shuffle Service Manager
作为整个集群的控制节点, 负责整个集群与客户端的任务调度, 元数据管理等主要工作.
- Manager会采用主备方式完成服务高可用, 主备实例会通过Zookeeper进行选主工作
- Manager会采取无状态应用模式, 整体状态都会同步到Zookeeper之中, Slave可以在重启时重建全部的状态
- 为了加速备实例的启动速度, 主备实例会保持心跳, 并定时同步全量或增量状态给备节点
- Manager与Server会保持心跳状态, Server通过心跳上报资源容量信息.
- Manager会主动想Server下发RPC通信
- 客户端会与Manager建立连接发送RPC请求, 但是Manager不会主动给客户端发送RPC请求
- Manager与资源调取器(Yarn/K8s)保持连接, 查询任务状态, 用以退出清理
- Manager内部设有调度器组件, 可以插件式的设置调度策略
- Manager内部有元数据管理, 主要是App级别的信息, Task级别的信息由Driver持有, 通过RPC请求送达Manager
- Manager服务有WebUI接口, 统计App级别监控信息
- Manager服务有jmx接口, 指标方便指标统计
Remote Shuffle Service Server
作为整个集群的数据节点, 负责与客户端Spark App进行数据交互
- Server是数据节点, 只会跟客户端进行数据通讯, 不参与控制通信
- Server与Manager会保持心跳, 同时会与Manager有控制流的RPC请求
- Server会将数据首先写入到本地的SSD, 如果开启了备份的话, 也会以pipeline的方式, 同步给其他备份节点, 最后开启了慢写入的话, 才会备份给DFS. 由于DFS一般有HDD磁盘组成, 因此性能会受到巨大的影响
- 默认情况下, 一个Reduce任务会开一个文件句柄, 用于写入Map的数据, 同时还有一个index文件记录数据块位置与TaskId的对应关系
- 读取数据时候, 会根据TaskId过滤无效的数据块
Spark App
作为整个集群的客户端, 负责实现Spark对外的接口, 并与RSS交互, 该模块除了实现SparkShuffleManager的接口之外, 还要实现Spark App的事件处理, 还需要在调度上处理任务失败的情况
- Shuffle接口: 实现Shuffle注册, 数据读取, 数据写入等工作
- Event接口: 实现整体控制流的处理, 例如App启停, Stage启停
- Schedule接口: 当任务失败时的, 需要重试, push Shuffle的重试方式不同, 目前Spark并没有有接口暴露, 可能会侵入式的修改源码
Zookeeper
作为整个集群的元数据中心, ZK承担着元数据备份, 服务发现, 主备切换等功能
- Manager会将App信息写入到ZK, 方便备实例恢复
- Manager通过ZK进行主备切换和监控
- Client会通过ZK的地址获取到Manager的地址
DFS
作为整个集群的数据备份中心, 目前公司内部只有HDFS作为DFS实现, 但是性能堪忧.
但当RSS集群也是以HDD磁盘为主时, Reduce任务直接获取DFS上的数据, 是个好的选择.
另外DFS方式的稳定性会更加好
交互图架构图
下面考虑一下RSS是如何和Spark的各个实例交互的, 先一个Map和Reduce任务是如何进行控制流通信的, 然后再看数据是如何写入的
控制流图
- Spark Driver向Manager发送申请Shuffle资源的请求
- Manager返回结果, 并指明ReduceTaskId对应的Server地址
- Driver根据位置分配Spark任务
- SparkTask计算各自的数据, 发送到对应Server
- Task写完所有的Map数据之后, Executor向DriverCommit任务
- 完成所有任务的时候, Driver向Manager发送CommitStage请求, 目的是传递最后成功的TaskId给到ShuffleManager
- Manager将TaskId等消息,下发到各个Server
- Driver下发Reduce任务(如果也是ShuffleMapTask的话, 依然需要先申请资源)
- ReduceTask向对应的Server拉去数据, Server需要根据TaskId过滤重复数据
数据流图
- MapTask先会将数据完成预聚合, 按照Partition分区
- 然后两个MapTask都将将P0,P1的数据推送到第一个Server
- Server会将两个MapTask的数据都写入到一个文件之中, 因此第一个Server有P0和P1两个文件, 第二个Server有P2,P3,P4三个文件
- 启动ReduceTask时候, 直接会去对应Server流式拉取数据
Remote Shuffle Service设计要点
可靠性
可靠性从三个方面来阐述, 一个是任务可靠性, 指异常发生的整体重试机制, 这里的重试指流程的重试, 而不是消息的重试, 因为RPC消息大多数可以做到幂等, 做不到幂等的就会出现不一致场景, 就是数据一致性问题. 另外一点数据可靠性, 只Shuffle数据文件的备份问题.
任务可靠性
首先我们还是回到上面的数据流图, 并标识其中可能失败的场景
下面分别解释一下错误的具体含义:
- MapDataPush失败值, MapTask之中单个Partition数据无法推送到对应的Server, 造成的原因一般为网络问题等
- 慢节点/坏节点, 当出现这种场景的时候, 是多个MapTask推送到同一个Server时候的出现
- MapTask失败, 这个场景为部分Partition数据写入成功, 部分写入失败场景, 造成错误的原因很多, 比如Executor OOM
- Map推测执行, 指启动两个Task计算同一份数据, 会导致所有数据写了2份
- Reduce拉取失败, 指单次失败, 可能网络问题导致
- Reduce持续失败, 拉取多次失败, 可能是慢节点等
- Server失败重启, 进程级别的重启, 服务多一会时间会恢复
- Server丢失数据, 一般是磁盘问题, 导致数据丢失了
PushData失败/多次失败
- 启动MapTask, Task之中携带Reduce对应Server地址, 一个Reduce对应2个地址, 正常情况只会使用前面一个
- MapTask计算Partition任务会将数据写入到临时
- 向Server写入数据
- 写入失败,返回错误
- 从临时文件之中获取Partition数据
- 重试写入(第一次失败的时候, 依然写入老地址, 第二次失败时, 写入备用地址)
- 重试, 直到达到达到最大次数
- 如果重试未成功, 将失败的节点加入黑名单, 然后重新运行整个Stage
- 如果重试成功, 启动Reduce任务, 此时需要指明备节点是否存有数据.
- 此时对应整个Reduce可能两个节点都存在数据, 因此需要向两个节点拉取数据
PushData的Block粒度为单个Partition, 如果再小, 那么数据一致性就无法保证了, 这里编程时候需要注意
关于Block重试的详细说明
- 网络写失败, 无妨, 重试即可
- 写本地失败, 写了一些, 需要Stage失败
- 写commit文件失败, 需要Stage失败
- 网络返回失败, Server做到幂等
2和3的问题, 大致是因为服务节点问题导致的, 这个目前比较难以处理, 先不管了[TODO]
如果做到幂等, 需要在元数据信息里面记载对应的taskAttemptId
数据重复
- Driver启动Map任务
- 开始写Partition数据, 成功写入P0
- 但P1数据还没有完成写入
- 此时Task任务遇到异常退出
- Driver启动Task任务重试
- Task又重复写入一遍P0数据
- 同时也完成P1数据的写入, 此时系统之中有2份P0, 1份P1. 由于写入数据时, 含有Task信息, 因此Server知道2份P0数据, 分别由哪个Task写入
- Executor向Driver 确认成功的Task
- Driver向Manager发送Map任务也完成了, 并将正常完成任务的TaskId信息发给Manager
- Manager直接将这些信息下发到Server
- Driver启动Reduce任务
- Reduce任务拉取数据的时候, 如果发现数据块对应的TaskId不在commitTask列表之中, 就会自动跳过这个数据块
commitTask下发给RSS的话, 容易造成元数据膨胀问题, 但如果将这些元数据放在Spark里面的话, 就会造成代码耦合程度太大了. 因此先看看编程时候能不能将代码耦合去除, 如果去除的话, 直接在ReduceTask测过滤
推测执行的难点在于, 两个独立进程写入同一个数据块, 那么在Server端就必须加锁来防止竞争, 但是一旦加锁, 性能就会收到影响
拉取失败
- Driver启动Reduce任务
- ReduceTask开始拉取数据, 但是返回异常
- 此时跟MapTask一样, 先开始Partition级别拉取重试, 如果第二次失败的时候, 开始拉取备份节点数据. 为了防止重复, 拉取成功的数据块, 需要记录对应的Task号, 拉取备份节点的时候, 需要重新过滤
- 如果Partition级别的重试未成功
- 上报给Driver, 准备重试Task
- 下发重试任务到新的Executor
- 此时如果一直失败的话, 有两个选择, 一个是直接宣布App失败了, 另外就是重选上一个Stage, 先选第一种
Server失败
- 以MapTask为例, ReduceTask其实也类似
- MapTask开始写入数据
- 此时对应Server进程shutdown了, 对应Manager来就是心跳消失
- MapTask首先会失败, 失败后会再次重试, 一般partition级别重试会一直失败, 那么就会换一个节点写入.
- 此时如果Shuffle Server能马上重启, 重新接入心跳, 那么Manager就会当没事发送. 如果超过一定时间, 还没有重启, 那么Manager会告之Driver数据丢失(这里可以被动的)
- Driver发现DataLost之后, 先查看是否开启数据备份功能, 如果有, 则继续. 如果没有开启数据备份, 那么就开始停止整个Stage, 等待重新下发全部的任务(如果有Reduce任务, 且数据没有开启备份功能, 则直接APP失败退出)
如果Server重启且不能恢复的话, 直接让APP失败, 由上层平台重试也许是个最好的方案
另外一种方式, Service一旦失败, 重试这个Stage, 上一个Stage的数据必须在DFS中有备份, 这样才能重试Stage.
数据一致性
数据的一致性问题, 其实在上面已经单独说过了, 但这里还是要单独拿出来看的, 因为数据不一致, 结果一般就不对了.
不一致情况的原有有以下几种:
- MapTask重试, 一些数据写了两遍
- 推测执行数据有两份
- Server失败, 导致一份数据写到两个Server, 这时有些数据重复了, 有些数据没重复
解决上面的问题的思路, 就是读的时候, 需要过滤多余的数据.
对于同一个Server写了2遍, 这时通过CommittedTaskId过滤掉多余Task产生的数据
对于多个Server数据, 读取的时候, 要记录哪些Task任务已经被消费了, 如果被消费了就不需要重新读取了
数据可靠性
数据备份功能, 一般有几个问题要选择: 谁来备份, 怎么备份, 备份到哪儿
谁来备份
一般有客户端和服务端备份, 客户端备份,就是客户端直接写多份数据, 服务端备份的话, 客户端只写一份数据, 然后服务端自己写到备份地方去
客户端备份的优点是简单, 缺点点网络连接多些, 并且与后端耦合
服务端是反过来的
我们这儿选服务端备份, 因为备份到哪儿还不是很确定
怎么备份
这儿有同步和异步的选项, 因为是临时数据, 所以Shuffle的备份最多应该就2个, 不会有半异步的选项
同步的特点是, 速度慢, 但是能保证数据一致
异步的话, 性能会好, 但是容易导致数据并没有完全ready
我们这儿选同步, 因为如果是异步的话, 上面的任务可靠性的处理会更加复杂一些, 这个性能等后续再优化吧
备份到哪儿
在我们的系统里面有两个选项, 其他Server或者DFS.
如果放到其他实例里面的话, 因为机器也是SSD的, 所以性能会好一些, 但是DFS性能会比较差, 不过稳定性会比较好, 不需要考虑数据丢失的问题了
这里优先DFS, 因为DFS有人维护.
可用性
主备切换
主备切换的能力依靠ZK来完成:
- 客户端和Server会监控ZK的地址, 如果Manager地址变化之后, 会主动切换地址和端口
- 主Manager会在ZK上写入数据, 而备Manager会一直监控着, 如果发现节点丢失, 即主Manager失联, 备Manager会读取Zookeeper上的元数据, 然后变成主Manager写入数据.
主备切换, 任务不中断
做到任务不中断, 需要有以下条件
- 切换时间短
- 无任何不一致状态, 或者状态都可以恢复
- 连接重试
要做到切换时间短, 需要备节点时刻监控主节点的状态, 不能落后太多, 不然就会启动延迟
尽量将必要的元数据信息都写入到ZK之中, 但ZK并非更新友好型存储系统, 因此也需要实现中间状态重构的能力,而这个重构不能出现状态丢失
客户端的所有RPC请求, 都要有重试功能, 一旦发现主备切换, RPC失败之后, 迅速切换到另外的节点.
监控
页面
参考Livy的实现, 只实现到APP级别即可
APP: Id, 提交时间, 结束时间, 时长, 当前Shuffle文件个数, 当前shuffle存储总量
指标
App总量: 个数, shuffle存储量, 文件总个数
资源容量: 磁盘,内存slot, cpu
JVM相关: 内存, 线程
慢节点告警
如何定义慢节点?
部署
兼容性方案
兼容性包含客户端和服务端之间,以及服务端Manager和Server时间.
客户端目前只会支持Spark2.4.3版本以及Spark3.0版本.服务端需要同时支持这两个版本的接入, 因此整体差异只会在于Spark与Client接口层次, RssServer的Client和Server之间的通信兼容性必然要遵循. 可以适当的在控制流监控预留部分json字段来保持未来的兼容.
服务端的Manager和Server的RPC也同上诉方式.
如果后续改动实在无法, 通过灰度升级的方案处理
多版本支持
由于兼容性和容错的考虑, Push方式的Shuffle Service很难实现滚动升级, 原因在于每个实例都有很多的网络连接, 一旦重启, SparkTask就会出现异常, 造成大规模的任务重试, 对集群会产生巨大的压力. 因此只能采用灰度升级方案, 发布也必须支持多版本特性.
每个版本发布包都有一个版本号, 进程启动是会将版本号写入到Zookeeper的元数据之中, 客户端会根据自身的版本号, 选择对应的路径, 然后获取到ShuffleManager的地址, 完成整个任务的启动环境.
所以, 整个客户端/RssManager/RssServer的版本都是配套的. 整套环境之中, 目标最多只能存在3个版本.
为了防止用户客户端死活不升级情况, Spark加载版本和jar包方式, 将通过Zookeeper获取元数据, 然后启动的时候, 远程加载配置项和jar文件, 这段代码需要注入到Spark之中, 而非RSS.
部署方案
实例角色 | 最少个数 | 推荐个数 | 部署位置 |
---|---|---|---|
Zookeeper | 3 | 5 | 单独节点, 至少独立磁盘 |
RssManager | 2 | 2 | Master单独一个节点, Slave单独一个节点 |
RssServer | 5 | 30 | 单独节点 |
- 每个节点, 对于单个版本, 只部署单个实例
- 机器尽量在同一机房, 网络带宽尽量大, 因为需要内部传输备份数据
- RssManager的资源尽量在8核16G以上, 防止RPC和元数据处理成为集群瓶颈
- RssServer机器尽量是SSD盘, 每个节点过挂一些数据盘, 尽量满足1核2G1TB的配置方式
升级方案
整体采用灰度升级方案, 即每个节点会部署多个实例, RssManager
和RssServer
各自部署一套, 两者元数据分开存储, 但调度器会感知多个版本的任务情况.
整体升级步骤如下:
- 部署新版本的
RssManager
, 在standby的机器上部署另外一个备RssManager
- 逐个安装新的版本的
RssServer
, 完成后并添加新版本的监控告警 - 更新新版本客户端到HDFS
- 更新Zookeeper上最新RSS客户端版本
- 过两天, 查看老版本的
RssManager
的负载, 如果任务数已经降低为0, 则准备下线版本 - 关闭监控告警, 逐个关闭Server, 最后关闭两个Manager.
Zookeeper节点切换
整个集群之中, Zookeeper虽然不是单点的, 但是ZK集群确实单点的, 一旦节点老旧必须替换升级或者IP切换的时候, ZK地址已经变换, RSS和Spark配置都要相对的变化, 这时就必须重启完成, 此时必须容忍任务大规模重试.
安全
鉴权
内部集群可以先不考虑
数据完整性
Shuffle数据作为临时数据, 用户即可清理, 可以先不做完整性校验(目前Spark也没有做)
数据清理
Shuffle数据根据App粒度清理, Yarn Shuffle Service之中, Driver在退出的时候, 会清理对应的数据, 但不删除目录. 目录有Yarn感知到App状态为完成之后, 下发NodeManager完成清理工作.
但在Shuffle Service之中, 由于无法感知App状态, 因此需要Driver来主动清理.
- 在Driver退出的过程之中, 会向Manager发送AppEnd的消息, 接收到请求之后, 开始清理App对应的Shuffle, 想各个Service发送完毕异步请求之后, 将App的状态标记为Deleted, 过一段时间后删除该状态
- 如果Driver异常退出, 度过静默期(无RPC往来的时长)之后ShuffleManager主动查询Yarn上App的状态, 如果为退出状态, 则主动删除数据.
- ShuffleManager如果主备切换, 由于App的信息保存在Zookeeper之中, 因此备节点依然会完成清理工作
- ShuffleService实例如果发重启, 那么它会主动询问是否有资源未清理, 如果发现本地APP的在状态为Deleted, 或者查询不到.
调度
调度算法
资源的类型有以下几类:
- 磁盘容量
- 内存Buffer量
- IO负载压力
- CPU负载压力
DataLocation
分配Server节点的时候, 需要返回机架信息, 方便Reduce任务决策启动在哪些几点之上.
性能
支持多磁盘写入
文件磁盘目录, 也需要根据Spark一样规划为多级目录, 同时支持多个SSD目录
文件也会根据AppId + ShuffleId + PartitionId的hash值计算出对应的目录路径
小数据块合并发送
Map写入, 如果数据块比较小, 且写入同一个Server, 则可以合并发送.
文件流式读取
对于Reduce任务, 如果Partition实在太大, 可以根据流式方式读取
小数据块合并读取
例如上图, 在shuffleRead的时候,需要花费近6个小时, 主要原因在于网络小io太多了
2GB限制
Spark在Shuffle的时候, 已经通过DownloadManager
, 通过文件的方式解决了2GB的限制, 但目前这儿依然由这个限制.
如何解决问题? 仿照Spark
的处理方式, 如果发现数据量大于2GB, 则启动文件发送, 服务端需要重新定义handler
, 整个数据也写入到临时文件之中, 不要写在原有RSS的数据文件之中.ShuffleDataWrapper
需要加入一个新的字段, 或者将data_length
设置为负值