Carlmartin' Blog

In me the tiger sniffs the rose.

新环境初始化

用于新环境的安装, 例如刚申请了一个电脑, 假设你配置完毕git和下载安装完毕NodeJs

执行下面的命令, 完成初始化命令

1
2
3
git clone git@github.com:SaintBacchus/hexo.git
cd hexo
npm install hexo-cli -g

如何实现置顶功能

首先安装hexo插件:

1
2
npm uninstall hexo-generator-index --save
npm install hexo-generator-index-pin-top --save

然后在文章的Front-matter头部加入: top: true

出自博客

如何实现一个文章多个categories

多个categories

1
2
3
categories:
- [Sports]
- [Baseball]

多级categories

1
2
3
categories:
- Sports
- Baseball

或者
1
categories: [Sports,Baseball

组合使用:

1
2
3
categories:
- [Sports,Baseball]
- [Play]

出自博客

文章加密

使用如下命令安装:

1
npm install --save hexo-blog-encrypt

启动插件, 在根_config.yaml设置:

1
2
3
4
# Security
##
encrypt:
enable: true

在每篇文章的Format里面设置:

1
2
3
4
5
6
# password: 是该博客加密使用的密码
password: Mike
# abstract: 是该博客的摘要,会显示在博客的列表页
abstract: Welcome to my blog, enter password to read.
# message: 这个是博客查看时,密码输入框上面的描述性文字
message: Welcome to my blog, enter password to read.

出自Github

绑定域名

出自简书

支持HTTPS认证

出自博客

升级Hexo客户端

1
2
3
npm install -g hexo-cli
hexo -v
npm update

升级依赖

1
2
3
4
5
6
npm install -g npm-check
npm-check
npm install -g npm-upgrade
npm-upgrade
npm update -g
npm update --save

图片居中显示

在图谱插入的前面加入HTML前缀<div align=center>, 使用方式如下:

1
<div align=center>![](https://carlmartin-pic-1305764798.cos.ap-chengdu.myqcloud.com/hexo/images/aboutme/1.png)

修改默认Front-matter

在根目录下有/scaffolds/post.md的文件用来定义默认的文章格式

在此键入以下模板

1
2
3
4
5
6
7
---
title: {{ title }}
date: {{ date }}
tags:
categories:
typora-root-url: ..
---

问题现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2021-12-08 10:20:52,561 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 1
org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 252, host: 10.88.129.186, port: 8024; Code: 252, e.displayText() = DB::Exception: Too many parts (315). Merges are processing significantly slower than inserts.: while write prefix to view Storage omega_analyse_project_stream.dwm_log_pub_event_pvuv_hi_view_local (version 206.1.1)

at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1061) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1026) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1019) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:381) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:207) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:174) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:123) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_77]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

Flink任务写入CK时, 出现以上异常, 并开始重试, 如果重试次数过多, 就会触发flink的重启, flink重启达到5次, 用户就会收到告警.

目前Flink重试的时候, 会造成Checkpoint期间的数据, 全部重新写入, 即能够保障AT_LEAST_ONCE语义.

原因分析

image-20220304152821844

在数据每次写入的时候, 都会调用该方法, 判断数据写入频率是否过快了, 如果太快了, 就会执行delay操作.

如何判断过快呢? 通过以下判断

1
parts_count_in_partition >= settings->parts_to_throw_insert

parts_count_in_partition表示所有partition中DataPart个数最多的一个, parts_to_throw_insert是一个配置项, 默认是300

CK每次写入一批数据, 就会生成一个DataPart, 然后会有Merge线程merge多个DP合并为一个.

写入太过频繁或者Merge过慢, 就会出现Too Many Parts的问题.

写入频繁

写入频繁分两种情况:

第一种, 用户数据量不大, 但flink的参数写入不当, 由于现在Flink多是采用jdbc的接口, 攒批的参数需要用户手动设置, 默认值不随我们配置, 因此会导致批过小的问题

第二种, 用户数据量较大, flink参数已经被值班人员调整过, 但由于数据量巨大, 依然出现写入过快的问题. 这种问题比较容易出现在”小宽表”上, 即每行数据量比较少, 但是行数特别多的场景.

Merge过慢

Clickhouse一个节点的Merge线程为48个, 而且由于集群区分读写节点, 写节点就一个, 因此实际上单个shard内, 合并是单点的.

因此解决merge过慢的问题, 可以从增加写节点个数方式出发.

解决方案

针对以上的问题, 我们可以对问题, case by case制定改进方案

MemoryTable方式

这个方案是针对写入频繁中的第一种情况, 这种情况下, 在CK内部完成攒批, 可以解决用户Flink配置问题

MemoryTable可以参考CK中的Buffer引擎来实现, 但逻辑需要内嵌于MergeTree引擎中, 否则上线成本会增加很多.

但Buffer引擎和MemoryTable还是有一些区别

  1. Buffer并不保证数据的一致性, 而在MemoryTable中, 需要实现WAL方式, 支持数据恢复
  2. Buffer引擎是单机的, 而MergeTree需要考虑分布式的能力, 防止节点异常的情况, 需要实现WAL文件的复制能力

除去以上的问题, 还有两个细节问题, 在实现上要非常注意:

第一, 如果用户的SQL中有final等特别的关键字, 那么需要跟内存的数据做merge

第二, 如果用户准备修改表结构, 那么内存中的结构也会跟着磁盘的结构一起修改, 此时还需要注意新写入的数据情况

难点一

final等关键字

难点二

alter时, 数据写入

列式格式写入

这种情况可以解决写入频繁中的第二种情况, 如果列存数据的压缩能力, 将批次变大.

由于列存格式的适配, 需要引入新的Flink Connect, 那么在修改过程中, 我们可以在新的Connector中, 实现对于写入频繁的第一种场景的参数进行有效的空,

目前已经支持Parquet格式的写入, 但是Parquet不支持复杂类型, 例如Array/Map等, 最好能够引进ORC或者CK本身的Native的Block数据结构, 减去转换的开销.

难点三

Native格式是CK最有效的写入格式, CK主要将数据按照自己读起来就行, 省却了压缩编码的工作, 会极大的增加导入的性能.

但Native格式是CK的内部的格式, 可能需要实现在Java中用JNI接口调用C++源码.

该方案在其他列存格式有问题的情况下, 再来实现吧.

读写交替

下面的三个方面的目标, 都是增加写节点.

目前集群由读写节点确定, 读写节点是固定, 只有故障的时候, 才会变化, 从监控上看, 读写节点压力非常不一致, 写节点的网络写入只有读的一半左右.

因此, 在公共集群上, 可以实现读写能力的交替, 即对同一张表来说, 数据永远只写一个节点(通过表的hash计算), 但对于不同表的写入, 可以是不同的节点, 因此称为读写交替.

多写集群

比上面的方案更好的, 是完成表级别的多写能力, CK原生也是支持的. 但这种情况下, 在变更时候的数据一致性, 还是需要重点测试的.

难点四

表多写的影响, Clickhouse原生支持Multi Write, 但不确定是否在复杂环境下, 有异常场景, 可能需要做一次严格的测试来论证这个问题.

集群扩容

最后一种方案, 是最简单的. 就是shard扩容, 但我们目前一直都没有一个扩shard的方案, 因此该方案, 依赖辉哥的读写分离的方案, 等读写分离实现后, 实现集群扩容会简化很多.

难点五

读写分离方案, 支持扩容shard实现写入加数

该文档写于2021年4月, 当时按照设计文档实现了RSS的原型(基于Uber的开源RSS), 但是后续应该人事调整, 我开始从事OLAP工作后,就不在开发RSS了.

Spark Shuffle任务的现状阐述

目前公司内部有最大集群规模已经达到8000多台机器, 总共有近45万核, 1.23PB的内存, 其中每天有10万+个Spark的离线任务在运行.

目前团队正在将HiveSQL的任务都迁移到Spark引擎之上, 提升整体性能和资源利用率, Spark将承接越来越多的业务压力.

但随着Spark应用越来越多, 任务的稳定性受到了越来越多的挑战, 尤其是Spark Shuffle这块的问题, 一旦数据量超过一定阈值, 就会出现大量的Shuffle Fetch失败的情况, 由于Spark做了很好的重试机制, 因此有时候依然能跑出来结果, 但大量计算资源浪费在重试步骤之中; 而有时候因为失败太多, 导致整个App失败退出.

举个真实的案例, 下图是某业务运行的一个UI监控图, 从图中可以看到, 在Failed Stages里面出现了大量的FetchFailedException, 整个程序在不停的Stage重试, 而最后这个case由于失败过多整个APP失败了
image-20220228172502068

在上图中的19个Failed Stages都是由于FetchFailedException造成的

Spark Shuffle任务的瓶颈分析

监控发现

在分析这类问题的时候, 我们发现了经典的”三高”现象, 即网络,磁盘, CPU都很高.

image-20220228172518083
如上图所示, 一般出现FetchFailedException时, 此时机器上的监控一般会出现这三者的峰值.
磁盘和网络的峰值一般一起出现, 因为Shuffle时会有大量的数据交换
CPU出现峰值时有两种情况, 一种Wait比较高(图中黄色部分), 说明CPU忙于处理IO操作,一种Wait不高, 但总体CPU比较高, 这种情况往往是该节点有其他任务运行导致的, 进程之间没做到CPU隔离 或者一些热点计算问题.

Shuffle原理

image-20220228172616029
从监控看到了, Shuffle过程是个”三高”过程, 同时是IO和CPU密集型的过程, 这一节从Shuffle的原理上, 解释一下为什么Shuffle过程为什么会出现瓶颈.

Shuffle本质是一种数据交换机制, 在MR模型中数据是按照(key, value)的元组方式组织的, 经过MapTask之后, Key的数值已经变化了, Shuffle的目标将这些数据重新组合, 相同新Key的数据放到一起处理.

因此, 如果所示, MapTask按照新Key产生了3个不同的数据块, 每个数据块与Reduce的个数对应, ReduceTask则直接去获取对应的数据块(图中用颜色表示)

在Spark的实现之中, MapTask将这些数据重排序后, 写入到同一个文件之中, 通过元数据标识数据所在offset和length. 例如在文件的(offset=0, length=512)的数据为P0, (offset=512, length=1024)的数据为P1. Map阶段要写完所有数据, 磁盘吞吐非常大.

ReduceTask拿数据的时候, 要获取所有MapTask的输出, 所以必须跟所有MapTask文件所在Server产生网络连接, 网络连接会非常多. 需要传输所有的MapTask输出到其他节点, 那么网络吞吐也会非常大. 其次, 我们拿P1数据的时候, 是直接读取文件的中间位置的, 因此随机IO会非常多, 磁盘繁忙. 最后为了减少网络和磁盘的压力, Spark加入了Shuffle过程的Merge和Shuffle数据压缩能力, 这就加重CPU的压力, 但一般情况下CPU压力没那么大, 除非被别的任务影响

如果有兴趣了解更加详细的Shuffle原理的同学, 可以参考这篇文章

原因归纳

综上所述, 我们简单归纳原因:

  • 大量网络连接, 且非常多的网络小包
  • 磁盘大吞吐, 且随机读取, 触发IO瓶颈
  • 计算存储耦合, CPU密集型任务影响Shuffle

目前的解决方式

针对以上的原因, 目前团队也有一些处理措施:

  • 增加超时配置项, 缓解网络问题
  • 增加Yarn Shuffle Service处理IO的线程个数,缓解IO问题
  • 隔离集群, 给队列打上独立NodeLable, 去除其他进程影响

以上措施在一定程度上缓解了FetchFailedException问题, 但这些措施都治标不治本:

  • 增加网络超时, 会加大重试的时间, 导致异常情况下时间更长
  • Yarn节点的IO已经达到硬件上线, 无法再通过多线程加速
  • 隔离集群, 使得集群资源的使用率低, 平均CPU使用率在30%以下

但随着业务数据量越来越多, 该问题又开始出现了, 是时候提出一种根治的方案了

Spark Shuffle任务的改进方案

方案设计

这个章节, 主要针对上文提到的”三高”问题, 看看从设计上如何解决这些问题

网络问题

在原有的网络模型之中, 网络连接个数为M*R, 而M和R都在Executor之中, 可以复用网络连接, 实际的连接为E^2. 如果executor个数为2000个, 那么会有4百万的连接, 一些排在后面网络连接经常出现超时.

为了性能, Spark在Reduce启动的时候, 会尽可能的建立所有的链接

另外一个问题是网络传输的时候小包太多了, 一个分区的平均大小<1MB, 大量的时间消耗在网络连接的创建过程之中, 能不能提前聚合数据, Reduce启动的时候, 直接读取数据就行了.

对于这两个问题, Push-Shuffle可以很好的解决这两个问题
image-20220228172633521

该方案数据的流程, 并不是由ReduceTask主动Pull, 而是由MapTask主动Push到ShuffleService里.
如图所示, 2个MapTask计算完分区数据P0后, 将两者的P0都push到第一个Shuffle Service.
ReduceTask启动后, 直接获取第一个Shuffle Service之中P0数据即可.

回看原来的问题, 该方案的连接个数为(E*S + E), S为RSS的个数, 一般在10-50之间, 远远小于E的个数, 有效减少连接;ShuffleService会合并数据(只要读取的时候, 多读几个块就好), 可以方便的处理小包问题.

发送时的小包, 可以通过同步发送P0和P1到同一个节点来解决

此外Push-Shuffle将随机IO变成了顺序IO, 解决了随机IO问题.

磁盘问题

磁盘问题一个在于随机读写, Push-Shuffle将Reduce阶段的随机读写变成顺序读写
另外一个问题就是读写速度, 我们的解决思路就是使用SSD替换HDD, 同时SSD对于随机读写也非常友好, 因此硬件加速完全好于预期.
那么用Spark原来的External Shuffle Service是否也能上SSD解决这个问题呢?
答案是否定的, 原因在于ESS是和Yarn强制绑定的, 代码逻辑在NodeManager的进程之中, 替换全部Yarn集群的磁盘, 成本上根本无法接受. 因此RSS必须是存储计算分离的, 也就是需要是个完全独立服务.
总结来说, 解决磁盘问题方法是: Push-Shuffle, SSD加速独立服务

CPU问题

如<瓶颈分析>中提到的, CPU问题并不是Shuffle的核心问题, 主要问题在于热点和隔离.
为了解决热点问题, 我们引入了Shuffle 调度器, 通过监控App里面磁盘网络内存压力, 防止出现单服务器热点, 拖慢计算或者导致失败.
对于隔离问题, 我们采用独立部署, 解除对Yarn集群的依赖

Remote Shuffle Service方案

image-20220228172650782
由上文的铺垫, 整体架构也呼之欲出了, 整体采用典型的Master-Slave模式, 其中Client为Spark App, Master为Remote Shuffle Service Manager(RssManager), Slave为Remote Shuffle Service Server(RssServer), 还有DFS作为数据备份, Zookeeper作为元数据存储.

Remote Shuffle Service Manager

作为整个集群的控制节点, 负责整个集群与客户端的任务调度, 元数据管理等主要工作.

  • Manager会采用主备方式完成服务高可用, 主备实例会通过Zookeeper进行选主工作
  • Manager会采取无状态应用模式, 整体状态都会同步到Zookeeper之中, Slave可以在重启时重建全部的状态
  • 为了加速备实例的启动速度, 主备实例会保持心跳, 并定时同步全量或增量状态给备节点
  • Manager与Server会保持心跳状态, Server通过心跳上报资源容量信息.
  • Manager会主动想Server下发RPC通信
  • 客户端会与Manager建立连接发送RPC请求, 但是Manager不会主动给客户端发送RPC请求
  • Manager与资源调取器(Yarn/K8s)保持连接, 查询任务状态, 用以退出清理
  • Manager内部设有调度器组件, 可以插件式的设置调度策略
  • Manager内部有元数据管理, 主要是App级别的信息, Task级别的信息由Driver持有, 通过RPC请求送达Manager
  • Manager服务有WebUI接口, 统计App级别监控信息
  • Manager服务有jmx接口, 指标方便指标统计

Remote Shuffle Service Server

作为整个集群的数据节点, 负责与客户端Spark App进行数据交互

  • Server是数据节点, 只会跟客户端进行数据通讯, 不参与控制通信
  • Server与Manager会保持心跳, 同时会与Manager有控制流的RPC请求
  • Server会将数据首先写入到本地的SSD, 如果开启了备份的话, 也会以pipeline的方式, 同步给其他备份节点, 最后开启了慢写入的话, 才会备份给DFS. 由于DFS一般有HDD磁盘组成, 因此性能会受到巨大的影响
  • 默认情况下, 一个Reduce任务会开一个文件句柄, 用于写入Map的数据, 同时还有一个index文件记录数据块位置与TaskId的对应关系
  • 读取数据时候, 会根据TaskId过滤无效的数据块

Spark App

作为整个集群的客户端, 负责实现Spark对外的接口, 并与RSS交互, 该模块除了实现SparkShuffleManager的接口之外, 还要实现Spark App的事件处理, 还需要在调度上处理任务失败的情况

  • Shuffle接口: 实现Shuffle注册, 数据读取, 数据写入等工作
  • Event接口: 实现整体控制流的处理, 例如App启停, Stage启停
  • Schedule接口: 当任务失败时的, 需要重试, push Shuffle的重试方式不同, 目前Spark并没有有接口暴露, 可能会侵入式的修改源码

Zookeeper

作为整个集群的元数据中心, ZK承担着元数据备份, 服务发现, 主备切换等功能

  • Manager会将App信息写入到ZK, 方便备实例恢复
  • Manager通过ZK进行主备切换和监控
  • Client会通过ZK的地址获取到Manager的地址

DFS

作为整个集群的数据备份中心, 目前公司内部只有HDFS作为DFS实现, 但是性能堪忧.
但当RSS集群也是以HDD磁盘为主时, Reduce任务直接获取DFS上的数据, 是个好的选择.
另外DFS方式的稳定性会更加好

方案效果

我们选了两个典型的案例:

业务1

使用默认的Shuffle方式, 总耗时7.3h, 有23个Failed Stage,错误类型全是FetchFailed
image-20220228172706050

使用RemoteShuffleService模式, 总耗时2.9小时, 无任何失败的Stage, 总Shuffle Write数据量在15TB左右, 最大ShuffleStage为5.6TB
image-20220228172721493

业务2

使用默认的Shuffle方式, 总耗时1.6h, 有21个Failed Stage

使用RemoteShuffleService模式, 总耗时38分钟, 无任何失败的Stage, Shuffle Write数据量在8TB左右, 最大ShuffleStage为0.3TB

比较

业务 地图 服务分
总Shuffle量 15TB 8TB
最大ShuffleStage 5.6TB 0.3TB
默认Shuffle耗时 7.3小时 1.6小时
默认Shuffle失败Stage个数 23个 21个
RSS耗时 2.9小时 0.75小时

Spark Shuffle任务的未来展望

目前RSS属于刚起步阶段, 一期上线时(21年Q2)只会覆盖部分业务场景, 未来还需要在以下方向做深度的优化:

  1. 持续优化性能和功能, 减少小文件传输的等问题
  2. 完善服务的可运维性, 增加运维的页面, 提供各种监控指标
  3. 提高服务的鲁棒性, 能够处理各种服务异常, 在绝大多数场景下能够正常完成计算
  4. 提高服务可测试性, 完备各种测试用例, 防止异常bug对数据正确性的影响
  5. 智能决策Shuffle类型, Spark引擎能够自动决策Shuffle类型, 一旦RSS失败后能够回退到默认的Shuffle
  6. 优化Shuffle数据的调度策略, 解决Shuffle热点瓶颈问题, 优化集群整体使用率
  7. Cloud Native支持, 提供较好的弹性伸缩能力, 提高集群利用率
  8. 支持与大数据其他组件混部, 提高集群利用率

Shuffle Service设计文档

该文档写于2021年1月

项目背景

业务场景

目前公司内部有3个集群, 最大集群规模已经达到8000多台机器, 总共有近45万核, 1.23PB的内存.
待补完…

Spark业务负载

目前集群上, 每天有60万个离线任务在运行, 其中Spark任务有10万个. 公司未来整体战略会将所有HiveMR的任务替换Spark任务, 今年将整个数据平台上的SQL任务基本上都迁移到Spark上.
整体的资源利用率得到了巨大的提升, 但随着Spark应用越来越多, 任务的稳定性受到了越来越多的挑战, 尤其是Spark Shuffle这块的问题, 总是让运维人员头疼不已, 一旦当天数据量超过历史, 会导致任务失败, 就需要隔离机器重新运行, 等业务量降低时候, 再释放空闲资源, 给整个团队巨大的运维成本.

当前面临的问题

我们分析Spark Shuffle的问题, 发现目前Shuffle机制存在如下几个问题:

  1. 计算存储不分离, Spark计算和Shuffle的IO操作混在一起, 极易因为CPU过高导致Shuffle超时失败
  2. 机器超卖, 虽然提高了整体使用率, 但是会因为调度不均匀, 导致失败CPU飙升
  3. Yarn Shuffle Service内嵌于NodeManager之中, 无法给全部集群替换SSD加速IO效率
  4. Pull模式的Shuffle, Reduce阶段有大量的随机读取过程, 导致磁盘IO飙升
  5. Reduce阶段数据未提前聚会, 导致导致大量网络小包产生, 造成网络IO飙升
  6. 调度器无法识别CPU/IO繁忙状态, 导致任务依然下发到近乎满载的机器

业界的解决方式

随着Spark成为业界的批处理的标准, 在各大互联网公司之中, Spark Shuffle以上的问题都慢慢出现, 无法通过简单的调优解决这个问题也慢慢成为这个大家的共识. 因此各个公司都推出了自己的解决方案:

  1. Spark社区和LinkedIn: 领英在VLDB2020发了一篇Paper, 提出Push based Shuffle来解决4和5点, 该方案依然嵌入在原有的Spark框架之中, 因此该方案最容易被社区接受, 现在也慢慢的在合入代码
  2. Uber: 在今年的Spark Meetup里提出了他们的独立的Shuffle Service的解决方案, 也是目前唯一开源的独立Shuffle Service服务, 意图解决1-5点问题, 并引入SSD解决IO问题. 但目前开源出来的代码健壮性不强, Task重试的时候, 数据会出现冗余,还需要很长的优化之路
  3. Facebook: 也有名为cosico的独立Shuffle Service, 从架构图上看他们解决了4-6的问题, 但容错使用了DFS系统, Reduce数据也是从DFS直接获取的, 因此性能是打问号的, 且未公开源码, 并不确定具体如何实现
  4. 阿里云: 20年12月份, 阿里云EMR团队公布了他们的Shuffle Service的方案, 同样未开源, 从公开文章上看, 他们除了解决Shuffle稳定性问题(1-5点), 更重要的点是为了解决Spark on k8s在磁盘上性能解决方案. 同样阿里云的方案也未开源
  5. Intel: 作为一家硬件厂商, 推出了自己的Shuffle Service, 目的是为来卖RDMA产品, 暂时应该不会去采购硬件, 因此略过

设计目标及范围

简单调研几家产品之后, 我们打算博采众长, 并找到符合自己的场景的方案, 主要参考Uber和阿里云:

  • 独立的Shuffle服务, 存储计算分离(解决1-2)
  • SSD存储, 加速IO效率, 减少计算资源, 提高整体资源利用率(解决3)
  • Push based Shuffle, Server端聚合后再Reduce(解决4-5)

相对于阿里云, 我们不需要做的:

  • 不需要全部替换原有Shuffle Service, 能用Adaptive的方式, 决策使用哪种Shuffle模式
  • 最好是CloudNative的方案, 但是不强制

而相对于他们两个的方案, 我需要:

  • IO敏感型调度, 解决热点问题

Shuffle Service顶层设计

服务架构图

image-20220228172007077

整体架构会采用典型的Master-Slave模式, 其中Client为Spark App, master为Remote Shuffle Service Manager(RssManager), Slave为Remote Shuffle Service Server(RssServer), 还有DFS作为数据备份, Zookeeper作为元数据存储.

Remote Shuffle Service Manager

作为整个集群的控制节点, 负责整个集群与客户端的任务调度, 元数据管理等主要工作.

  • Manager会采用主备方式完成服务高可用, 主备实例会通过Zookeeper进行选主工作
  • Manager会采取无状态应用模式, 整体状态都会同步到Zookeeper之中, Slave可以在重启时重建全部的状态
  • 为了加速备实例的启动速度, 主备实例会保持心跳, 并定时同步全量或增量状态给备节点
  • Manager与Server会保持心跳状态, Server通过心跳上报资源容量信息.
  • Manager会主动想Server下发RPC通信
  • 客户端会与Manager建立连接发送RPC请求, 但是Manager不会主动给客户端发送RPC请求
  • Manager与资源调取器(Yarn/K8s)保持连接, 查询任务状态, 用以退出清理
  • Manager内部设有调度器组件, 可以插件式的设置调度策略
  • Manager内部有元数据管理, 主要是App级别的信息, Task级别的信息由Driver持有, 通过RPC请求送达Manager
  • Manager服务有WebUI接口, 统计App级别监控信息
  • Manager服务有jmx接口, 指标方便指标统计

Remote Shuffle Service Server

作为整个集群的数据节点, 负责与客户端Spark App进行数据交互

  • Server是数据节点, 只会跟客户端进行数据通讯, 不参与控制通信
  • Server与Manager会保持心跳, 同时会与Manager有控制流的RPC请求
  • Server会将数据首先写入到本地的SSD, 如果开启了备份的话, 也会以pipeline的方式, 同步给其他备份节点, 最后开启了慢写入的话, 才会备份给DFS. 由于DFS一般有HDD磁盘组成, 因此性能会受到巨大的影响
  • 默认情况下, 一个Reduce任务会开一个文件句柄, 用于写入Map的数据, 同时还有一个index文件记录数据块位置与TaskId的对应关系
  • 读取数据时候, 会根据TaskId过滤无效的数据块

Spark App

作为整个集群的客户端, 负责实现Spark对外的接口, 并与RSS交互, 该模块除了实现SparkShuffleManager的接口之外, 还要实现Spark App的事件处理, 还需要在调度上处理任务失败的情况

  • Shuffle接口: 实现Shuffle注册, 数据读取, 数据写入等工作
  • Event接口: 实现整体控制流的处理, 例如App启停, Stage启停
  • Schedule接口: 当任务失败时的, 需要重试, push Shuffle的重试方式不同, 目前Spark并没有有接口暴露, 可能会侵入式的修改源码

Zookeeper

作为整个集群的元数据中心, ZK承担着元数据备份, 服务发现, 主备切换等功能

  • Manager会将App信息写入到ZK, 方便备实例恢复
  • Manager通过ZK进行主备切换和监控
  • Client会通过ZK的地址获取到Manager的地址

DFS

作为整个集群的数据备份中心, 目前公司内部只有HDFS作为DFS实现, 但是性能堪忧.
但当RSS集群也是以HDD磁盘为主时, Reduce任务直接获取DFS上的数据, 是个好的选择.
另外DFS方式的稳定性会更加好

交互图架构图

下面考虑一下RSS是如何和Spark的各个实例交互的, 先一个Map和Reduce任务是如何进行控制流通信的, 然后再看数据是如何写入的

控制流图

image-20220228172021282

  1. Spark Driver向Manager发送申请Shuffle资源的请求
  2. Manager返回结果, 并指明ReduceTaskId对应的Server地址
  3. Driver根据位置分配Spark任务
  4. SparkTask计算各自的数据, 发送到对应Server
  5. Task写完所有的Map数据之后, Executor向DriverCommit任务
  6. 完成所有任务的时候, Driver向Manager发送CommitStage请求, 目的是传递最后成功的TaskId给到ShuffleManager
  7. Manager将TaskId等消息,下发到各个Server
  8. Driver下发Reduce任务(如果也是ShuffleMapTask的话, 依然需要先申请资源)
  9. ReduceTask向对应的Server拉去数据, Server需要根据TaskId过滤重复数据

数据流图

image-20220228172040055

  • MapTask先会将数据完成预聚合, 按照Partition分区
  • 然后两个MapTask都将将P0,P1的数据推送到第一个Server
  • Server会将两个MapTask的数据都写入到一个文件之中, 因此第一个Server有P0和P1两个文件, 第二个Server有P2,P3,P4三个文件
  • 启动ReduceTask时候, 直接会去对应Server流式拉取数据

Remote Shuffle Service设计要点

可靠性

可靠性从三个方面来阐述, 一个是任务可靠性, 指异常发生的整体重试机制, 这里的重试指流程的重试, 而不是消息的重试, 因为RPC消息大多数可以做到幂等, 做不到幂等的就会出现不一致场景, 就是数据一致性问题. 另外一点数据可靠性, 只Shuffle数据文件的备份问题.

任务可靠性

首先我们还是回到上面的数据流图, 并标识其中可能失败的场景
image-20220228172054924
下面分别解释一下错误的具体含义:

  1. MapDataPush失败值, MapTask之中单个Partition数据无法推送到对应的Server, 造成的原因一般为网络问题等
  2. 慢节点/坏节点, 当出现这种场景的时候, 是多个MapTask推送到同一个Server时候的出现
  3. MapTask失败, 这个场景为部分Partition数据写入成功, 部分写入失败场景, 造成错误的原因很多, 比如Executor OOM
  4. Map推测执行, 指启动两个Task计算同一份数据, 会导致所有数据写了2份
  5. Reduce拉取失败, 指单次失败, 可能网络问题导致
  6. Reduce持续失败, 拉取多次失败, 可能是慢节点等
  7. Server失败重启, 进程级别的重启, 服务多一会时间会恢复
  8. Server丢失数据, 一般是磁盘问题, 导致数据丢失了
PushData失败/多次失败

image-20220228172110777

  1. 启动MapTask, Task之中携带Reduce对应Server地址, 一个Reduce对应2个地址, 正常情况只会使用前面一个
  2. MapTask计算Partition任务会将数据写入到临时
  3. 向Server写入数据
  4. 写入失败,返回错误
  5. 从临时文件之中获取Partition数据
  6. 重试写入(第一次失败的时候, 依然写入老地址, 第二次失败时, 写入备用地址)
  7. 重试, 直到达到达到最大次数
  8. 如果重试未成功, 将失败的节点加入黑名单, 然后重新运行整个Stage
  9. 如果重试成功, 启动Reduce任务, 此时需要指明备节点是否存有数据.
  10. 此时对应整个Reduce可能两个节点都存在数据, 因此需要向两个节点拉取数据

PushData的Block粒度为单个Partition, 如果再小, 那么数据一致性就无法保证了, 这里编程时候需要注意

关于Block重试的详细说明

image-20220228172158952

  1. 网络写失败, 无妨, 重试即可
  2. 写本地失败, 写了一些, 需要Stage失败
  3. 写commit文件失败, 需要Stage失败
  4. 网络返回失败, Server做到幂等

2和3的问题, 大致是因为服务节点问题导致的, 这个目前比较难以处理, 先不管了[TODO]
如果做到幂等, 需要在元数据信息里面记载对应的taskAttemptId

数据重复

image-20220228172216824

  1. Driver启动Map任务
  2. 开始写Partition数据, 成功写入P0
  3. 但P1数据还没有完成写入
  4. 此时Task任务遇到异常退出
  5. Driver启动Task任务重试
  6. Task又重复写入一遍P0数据
  7. 同时也完成P1数据的写入, 此时系统之中有2份P0, 1份P1. 由于写入数据时, 含有Task信息, 因此Server知道2份P0数据, 分别由哪个Task写入
  8. Executor向Driver 确认成功的Task
  9. Driver向Manager发送Map任务也完成了, 并将正常完成任务的TaskId信息发给Manager
  10. Manager直接将这些信息下发到Server
  11. Driver启动Reduce任务
  12. Reduce任务拉取数据的时候, 如果发现数据块对应的TaskId不在commitTask列表之中, 就会自动跳过这个数据块

commitTask下发给RSS的话, 容易造成元数据膨胀问题, 但如果将这些元数据放在Spark里面的话, 就会造成代码耦合程度太大了. 因此先看看编程时候能不能将代码耦合去除, 如果去除的话, 直接在ReduceTask测过滤

推测执行的难点在于, 两个独立进程写入同一个数据块, 那么在Server端就必须加锁来防止竞争, 但是一旦加锁, 性能就会收到影响

拉取失败

image-20220228172232885

  1. Driver启动Reduce任务
  2. ReduceTask开始拉取数据, 但是返回异常
  3. 此时跟MapTask一样, 先开始Partition级别拉取重试, 如果第二次失败的时候, 开始拉取备份节点数据. 为了防止重复, 拉取成功的数据块, 需要记录对应的Task号, 拉取备份节点的时候, 需要重新过滤
  4. 如果Partition级别的重试未成功
  5. 上报给Driver, 准备重试Task
  6. 下发重试任务到新的Executor
  7. 此时如果一直失败的话, 有两个选择, 一个是直接宣布App失败了, 另外就是重选上一个Stage, 先选第一种
Server失败

image-20220228172249213

  1. 以MapTask为例, ReduceTask其实也类似
  2. MapTask开始写入数据
  3. 此时对应Server进程shutdown了, 对应Manager来就是心跳消失
  4. MapTask首先会失败, 失败后会再次重试, 一般partition级别重试会一直失败, 那么就会换一个节点写入.
  5. 此时如果Shuffle Server能马上重启, 重新接入心跳, 那么Manager就会当没事发送. 如果超过一定时间, 还没有重启, 那么Manager会告之Driver数据丢失(这里可以被动的)
  6. Driver发现DataLost之后, 先查看是否开启数据备份功能, 如果有, 则继续. 如果没有开启数据备份, 那么就开始停止整个Stage, 等待重新下发全部的任务(如果有Reduce任务, 且数据没有开启备份功能, 则直接APP失败退出)

如果Server重启且不能恢复的话, 直接让APP失败, 由上层平台重试也许是个最好的方案

另外一种方式, Service一旦失败, 重试这个Stage, 上一个Stage的数据必须在DFS中有备份, 这样才能重试Stage.

数据一致性

数据的一致性问题, 其实在上面已经单独说过了, 但这里还是要单独拿出来看的, 因为数据不一致, 结果一般就不对了.
不一致情况的原有有以下几种:

  • MapTask重试, 一些数据写了两遍
  • 推测执行数据有两份
  • Server失败, 导致一份数据写到两个Server, 这时有些数据重复了, 有些数据没重复
    解决上面的问题的思路, 就是读的时候, 需要过滤多余的数据.
    对于同一个Server写了2遍, 这时通过CommittedTaskId过滤掉多余Task产生的数据
    对于多个Server数据, 读取的时候, 要记录哪些Task任务已经被消费了, 如果被消费了就不需要重新读取了

数据可靠性

数据备份功能, 一般有几个问题要选择: 谁来备份, 怎么备份, 备份到哪儿

谁来备份

一般有客户端和服务端备份, 客户端备份,就是客户端直接写多份数据, 服务端备份的话, 客户端只写一份数据, 然后服务端自己写到备份地方去
客户端备份的优点是简单, 缺点点网络连接多些, 并且与后端耦合
服务端是反过来的
我们这儿选服务端备份, 因为备份到哪儿还不是很确定

怎么备份

这儿有同步和异步的选项, 因为是临时数据, 所以Shuffle的备份最多应该就2个, 不会有半异步的选项
同步的特点是, 速度慢, 但是能保证数据一致
异步的话, 性能会好, 但是容易导致数据并没有完全ready
我们这儿选同步, 因为如果是异步的话, 上面的任务可靠性的处理会更加复杂一些, 这个性能等后续再优化吧

备份到哪儿

在我们的系统里面有两个选项, 其他Server或者DFS.
如果放到其他实例里面的话, 因为机器也是SSD的, 所以性能会好一些, 但是DFS性能会比较差, 不过稳定性会比较好, 不需要考虑数据丢失的问题了
这里优先DFS, 因为DFS有人维护.

可用性

主备切换

主备切换的能力依靠ZK来完成:

  • 客户端和Server会监控ZK的地址, 如果Manager地址变化之后, 会主动切换地址和端口
  • 主Manager会在ZK上写入数据, 而备Manager会一直监控着, 如果发现节点丢失, 即主Manager失联, 备Manager会读取Zookeeper上的元数据, 然后变成主Manager写入数据.

主备切换, 任务不中断

做到任务不中断, 需要有以下条件

  • 切换时间短
  • 无任何不一致状态, 或者状态都可以恢复
  • 连接重试
    要做到切换时间短, 需要备节点时刻监控主节点的状态, 不能落后太多, 不然就会启动延迟
    尽量将必要的元数据信息都写入到ZK之中, 但ZK并非更新友好型存储系统, 因此也需要实现中间状态重构的能力,而这个重构不能出现状态丢失
    客户端的所有RPC请求, 都要有重试功能, 一旦发现主备切换, RPC失败之后, 迅速切换到另外的节点.

监控

页面

参考Livy的实现, 只实现到APP级别即可
APP: Id, 提交时间, 结束时间, 时长, 当前Shuffle文件个数, 当前shuffle存储总量

指标

App总量: 个数, shuffle存储量, 文件总个数
资源容量: 磁盘,内存slot, cpu
JVM相关: 内存, 线程

慢节点告警

如何定义慢节点?

部署

兼容性方案

兼容性包含客户端和服务端之间,以及服务端Manager和Server时间.
客户端目前只会支持Spark2.4.3版本以及Spark3.0版本.服务端需要同时支持这两个版本的接入, 因此整体差异只会在于Spark与Client接口层次, RssServer的Client和Server之间的通信兼容性必然要遵循. 可以适当的在控制流监控预留部分json字段来保持未来的兼容.
服务端的Manager和Server的RPC也同上诉方式.
如果后续改动实在无法, 通过灰度升级的方案处理

多版本支持

由于兼容性和容错的考虑, Push方式的Shuffle Service很难实现滚动升级, 原因在于每个实例都有很多的网络连接, 一旦重启, SparkTask就会出现异常, 造成大规模的任务重试, 对集群会产生巨大的压力. 因此只能采用灰度升级方案, 发布也必须支持多版本特性.

每个版本发布包都有一个版本号, 进程启动是会将版本号写入到Zookeeper的元数据之中, 客户端会根据自身的版本号, 选择对应的路径, 然后获取到ShuffleManager的地址, 完成整个任务的启动环境.

所以, 整个客户端/RssManager/RssServer的版本都是配套的. 整套环境之中, 目标最多只能存在3个版本.
为了防止用户客户端死活不升级情况, Spark加载版本和jar包方式, 将通过Zookeeper获取元数据, 然后启动的时候, 远程加载配置项和jar文件, 这段代码需要注入到Spark之中, 而非RSS.

部署方案

实例角色 最少个数 推荐个数 部署位置
Zookeeper 3 5 单独节点, 至少独立磁盘
RssManager 2 2 Master单独一个节点, Slave单独一个节点
RssServer 5 30 单独节点
  1. 每个节点, 对于单个版本, 只部署单个实例
  2. 机器尽量在同一机房, 网络带宽尽量大, 因为需要内部传输备份数据
  3. RssManager的资源尽量在8核16G以上, 防止RPC和元数据处理成为集群瓶颈
  4. RssServer机器尽量是SSD盘, 每个节点过挂一些数据盘, 尽量满足1核2G1TB的配置方式

升级方案

整体采用灰度升级方案, 即每个节点会部署多个实例, RssManagerRssServer各自部署一套, 两者元数据分开存储, 但调度器会感知多个版本的任务情况.
整体升级步骤如下:

  1. 部署新版本的RssManager, 在standby的机器上部署另外一个备RssManager
  2. 逐个安装新的版本的RssServer, 完成后并添加新版本的监控告警
  3. 更新新版本客户端到HDFS
  4. 更新Zookeeper上最新RSS客户端版本
  5. 过两天, 查看老版本的RssManager的负载, 如果任务数已经降低为0, 则准备下线版本
  6. 关闭监控告警, 逐个关闭Server, 最后关闭两个Manager.
Zookeeper节点切换

整个集群之中, Zookeeper虽然不是单点的, 但是ZK集群确实单点的, 一旦节点老旧必须替换升级或者IP切换的时候, ZK地址已经变换, RSS和Spark配置都要相对的变化, 这时就必须重启完成, 此时必须容忍任务大规模重试.

安全

鉴权

内部集群可以先不考虑

数据完整性

Shuffle数据作为临时数据, 用户即可清理, 可以先不做完整性校验(目前Spark也没有做)

数据清理

Shuffle数据根据App粒度清理, Yarn Shuffle Service之中, Driver在退出的时候, 会清理对应的数据, 但不删除目录. 目录有Yarn感知到App状态为完成之后, 下发NodeManager完成清理工作.
但在Shuffle Service之中, 由于无法感知App状态, 因此需要Driver来主动清理.

  • 在Driver退出的过程之中, 会向Manager发送AppEnd的消息, 接收到请求之后, 开始清理App对应的Shuffle, 想各个Service发送完毕异步请求之后, 将App的状态标记为Deleted, 过一段时间后删除该状态
  • 如果Driver异常退出, 度过静默期(无RPC往来的时长)之后ShuffleManager主动查询Yarn上App的状态, 如果为退出状态, 则主动删除数据.
  • ShuffleManager如果主备切换, 由于App的信息保存在Zookeeper之中, 因此备节点依然会完成清理工作
  • ShuffleService实例如果发重启, 那么它会主动询问是否有资源未清理, 如果发现本地APP的在状态为Deleted, 或者查询不到.

调度

调度算法

资源的类型有以下几类:

  1. 磁盘容量
  2. 内存Buffer量
  3. IO负载压力
  4. CPU负载压力

DataLocation

分配Server节点的时候, 需要返回机架信息, 方便Reduce任务决策启动在哪些几点之上.

性能

支持多磁盘写入

文件磁盘目录, 也需要根据Spark一样规划为多级目录, 同时支持多个SSD目录
文件也会根据AppId + ShuffleId + PartitionId的hash值计算出对应的目录路径

小数据块合并发送

Map写入, 如果数据块比较小, 且写入同一个Server, 则可以合并发送.

文件流式读取

对于Reduce任务, 如果Partition实在太大, 可以根据流式方式读取

小数据块合并读取

image-20220228172327830
例如上图, 在shuffleRead的时候,需要花费近6个小时, 主要原因在于网络小io太多了

2GB限制

Spark在Shuffle的时候, 已经通过DownloadManager, 通过文件的方式解决了2GB的限制, 但目前这儿依然由这个限制.
如何解决问题? 仿照Spark的处理方式, 如果发现数据量大于2GB, 则启动文件发送, 服务端需要重新定义handler, 整个数据也写入到临时文件之中, 不要写在原有RSS的数据文件之中.
ShuffleDataWrapper需要加入一个新的字段, 或者将data_length设置为负值

Clickhouse的存储锁

源码解析

代码主要在IStorage之中

1
2
3
4
5
6
7
8
class IStorage 
{
TableLockHolder lockForShare;
TableLockHolder lockForAlter;
TableExclusiveLockHolder lockExclusively;
mutable RWLock alter_lock;
mutable RWLock drop_lock;
}

从代码的定义上来, 只有drop和alter相关的才会加锁

排它锁lockExclusively的锁同时锁住alter_lockdrop_lock, 锁类型为写锁

1
2
3
4
5
6
7
8
9
10
11
12
TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
{
TableExclusiveLockHolder result;
result.alter_lock = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);

result.drop_lock = tryLockTimed(drop_lock, RWLockImpl::Write, query_id, acquire_timeout);

return result;
}

排它锁只有很少的地方用到
image-20220228152910967

  1. 在renameTable时候用到
  2. 在DropTable的时候用到
  3. 在restartReplica时候用到

修改锁lockForAlter只锁了alter, 锁类型为写锁

1
2
3
4
5
6
7
8
9
TableLockHolder IStorage::lockForAlter(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
{
TableLockHolder result = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);

return result;
}

都只在修改DDL时候用到: DDL语句执行和复制表的同步DDL时
image-20220228152931062

共享锁lockForShare的锁加入在drop_lock之中, 锁类型为读锁

1
2
3
4
5
6
7
8
9
TableLockHolder IStorage::lockForShare(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
{
TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);

if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);

return result;
}

共享锁调用的地方非常多, 常见的Insert语句也是排它锁.

根据代码推论:

  1. 插入过程之中删除插入表, 删除动作会等待插入完成再执行
  2. 插入过程之中修改插入表, 表结构能够修改成功
  3. Insert语句锁住整个query执行时间

测试验证

建表语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
CREATE TABLE `lineorder_local`
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY);

插入语句

1
./clickhouse-client  --query "INSERT INTO hzw.lineorder_local FORMAT CSV" < lineorder.tbl

插入过程之中删除插入表

删除表语句

1
drop table lineorder_local;

image-20220228152954941

出现无法获取锁的问题

插入过程之中修改插入表

修改表语句

1
alter table lineorder_local drop column LO_QUANTITY;

执行后, 会立马成功
image-20220228153018786

由于乱序执行, 测试一个更加夸张的例子

1
alter table lineorder_local delete where 1=1;

image-20220228153033588
这里alter立马会成功, 数据会删除, 但是insert还在继续.

Clickhouse的WAL功能

设计目的

解决小批量数据写入时, 频繁写入dataPart, 导致磁盘繁忙的问题或者出现DB::Exception: Too many parts

具体可以参考这篇文档

并非binlog的模式, 数据flush后, 会清理wal文件

源码分析

首先看写逻辑的入口, 在MergeTreeDataWriter::writeTempPart创建了一个MergeTreeData::MutableDataPartPtr的类型

1
2
3
4
5
6
7
8
9
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
{
auto new_data_part = data.createPart(
part_name,
data.choosePartType(expected_size, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + part_name);
}

MergeTreeData::MutableDataPartPtr有三种实现, IN_MEMORY是写入到内存中, 由于内存是易失的, 所以需要WAL功能的辅助.COMPACTWIDE都是文件存储, 但文件编码和压缩方式不同.
1
2
3
4
5
6
7
8
9
10
11
12
13
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const VolumePtr & volume, const String & relative_path) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::IN_MEMORY)
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, volume, relative_path);
else
throw Exception("Unknown type of part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
}

上面的逻辑, 主要是根据MergeTreeDataPartType的类型选择, 那么我们看一下, 上面什么时候会选择该类型: 当part的文件小于min_bytes_for_compact_part或者行数小于min_rows_for_compact_part, 只要满足一种一项即可.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const
{
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE;

if (bytes_uncompressed < settings->min_bytes_for_compact_part || rows_count < settings->min_rows_for_compact_part)
return MergeTreeDataPartType::IN_MEMORY;

if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT;

return MergeTreeDataPartType::WIDE;
}

那么回过头来看MergeTreeDataPartInMemory的实现

1
2
3
4
5
class MergeTreeDataPartInMemory : public IMergeTreeDataPart
{
// 忽略部分函数
mutable Block block;
}

从源码上看, MergeTreeDataPartInMemory就是原本Block写入本地磁盘, 而它则放入到内存中.

那么如果进程重启, 内存中数据丢失后, 该如何处理呢?

答案在于MergeTreeWriteAheadLog的功能中, 将用户数据写入到WAL.

当进程启动的时候, 会调用restore从WAL的文件中, 将数据恢复到内存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
{

if (action_type == ActionType::ADD_PART)
{
auto part_disk = storage.reserveSpace(0)->getDisk();
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);

part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
MergeTreePartInfo::fromPartName(part_name, storage.format_version),
single_disk_volume,
part_name);

block = block_in.read();
}
}

那么wal文件会越写越多, 什么时候会开始清理部分数据呢?

1
2
3
4
5
6
7
8
9
10
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
{
// 写数据到WAL
block_out->write(block);
block_out->flush();

auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes)
rotate(lock);
}

addPart的逻辑之中, 会检查WAL文件的大小, 当文件大于write_ahead_log_max_bytes(默认为1GB)时, 开始清理WAL文件

另外一个问题, WAL的内存部分数据存放在哪儿, 在insert的时候(renameTempPartAndReplace), 数据会放到data_parts_indexes.insert之中, read时候从这里读取数据

1
2
3
4
5
6
7
8
9
10
MergeTreeData::renameTempPartAndReplace
{
part->name = part_name;
part->info = part_info;
part->is_temp = false;
part->state = DataPartState::PreCommitted;
part->renameTo(part_name, true);

auto part_it = data_parts_indexes.insert(part).first;
}

数据恢复的时候, loadDataParts的时候, 数据被读取出来, 然后插入到data_parts_indexes之中, 通过getActiveContainingPart过滤重复的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MergeTreeData::loadDataParts
{
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;

part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;

if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
}

可以看到Clickhouse的WAL机制, 并没有RocksDB的那种MemTable, 因此两个批次的数据, 并不会在内存中合并.

所有的合并操作, 依然由后台线程来处理, 支持在合并的流程中, 抽象为2个DataPart的合并, 但实际上可以是一个InMem的DP和一个OnDisk的DP做合并.

Clickhouse的合并后的数据都写入到磁盘中.

另外一个点, 在复制表中, InMem的DP依然会做同步.

DataPartsExchange有两个函数sendPartFromMemorydownloadPartToMemory, 前者用于发送数据, 后者用户下载数据, 同步后, 数据依然是InMem格式.

测试

创建一张表, 指定min_rows_for_compact_part为200,write_ahead_log_max_bytes为8192(8K)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE default.lineorder_local1
(
`LO_ORDERKEY` UInt32,
`LO_LINENUMBER` UInt8,
`LO_CUSTKEY` UInt32,
`LO_PARTKEY` UInt32,
`LO_SUPPKEY` UInt32,
`LO_ORDERDATE` Date,
`LO_ORDERPRIORITY` LowCardinality(String),
`LO_SHIPPRIORITY` UInt8,
`LO_QUANTITY` UInt8,
`LO_EXTENDEDPRICE` UInt32,
`LO_ORDTOTALPRICE` UInt32,
`LO_DISCOUNT` UInt8,
`LO_REVENUE` UInt32,
`LO_SUPPLYCOST` UInt32,
`LO_TAX` UInt8,
`LO_COMMITDATE` Date,
`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
SETTINGS index_granularity = 8192, min_rows_for_compact_part = 200,write_ahead_log_max_bytes=8192;

插入100条数据

1
insert into table lineorder_local1 select * from lineorder_local limit 100;

查看数据目录

1
2
3
4
5
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 12
drwxr-x--- 2 clickhouse clickhouse 6 Apr 12 14:53 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 14:53 format_version.txt
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 14:54 wal.bin

出现了一个wal.bin的文件
再插入一次, 发现又出现了一个bin文件
1
2
3
4
5
6
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 16
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:18 wal_1_2.bin
-rw-r----- 1 clickhouse clickhouse 0 Apr 12 15:18 wal.bin

多了一个wal_1_2.bin的文件, 我们在多插入几次, 到第5次插入的时候, 会生成一个datapart

1
2
3
4
5
6
7
8
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 40
drwxr-x--- 2 clickhouse clickhouse 4096 Apr 12 15:39 1992_1_5_1
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:18 wal_1_2.bin
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:26 wal_3_4.bin
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 15:39 wal.bin

再过一段时间观察, 发现wal_*_*.bin文件已经被删除了, 原因在于data_part已经commit的了
1
2
3
4
5
6
7
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 16
drwxr-x--- 2 clickhouse clickhouse 4096 Apr 12 15:39 1992_1_5_1
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 15:39 wal.bin
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$

存在的问题

现在CK创建视图有2种初始化数据的方式

  1. 在 CREATE 语句中带上 polulate 关键字,会将执行时间点之前的底表的历史数据全部初始化到视图中。执行完成后的底表新数据也会进视图,但是执行过程中的数据会丢失

  2. 在 CREATE 语句中不带 polulate 关键字,会将执行完成后的底表新数据会进视图,但是执行完成之前的数据都不会进视图;如果而后手工执行insert命令, 导入历史数据, 那么重复数据

Polulate数据丢失

根因分析

先明晰一下MV更新的时候的时序图

下图中有4个角色: 插入语句执行者, 底表插入数据执行者, MV1插入数据执行者, MV2创建语句执行者

image-20220227113952566

首先, 对于底表来说, 第一批写开始时, 因为MV2并没有被创建, 因此对于它来说, 只会给MV1主动推送数据, 相当于这批次的数据全部丢失了.

其次, 对于MV2创建来说, 由于带有populate字段, 他会主动去拉取历史数据, 但是由于底表没有全部写完, 只写了DP1, 那么DP2和DP3的数据就丢失了.

因此, 总结来说, 在插入过程之中创建底表会丢失的数据是, 在底表里DataPart没有写完的那些数据, 写完的数据还是会被读取的.

最后, 看第二批写的情况, 由于检查MV的时候, MV1和MV2已经存在(虽然这个时候, 创建MV2的语句被没有结束, 但是由于CK没有一致性保证, 所以元数据的信息可以被外面捕获到), 此时第二批次的数据, 在正常情况下能够推送给MV2.

解决方案

利用存储的锁机制, 在插入时禁止populate操作

image-20220227114132890

  1. 写入时, 加入写入的共享锁, 可以支持同时插入到一个数据表中
  2. 执行create table的元数据创建时, 获取底表的排它锁, 如图中所示, 此时正好在插入, 则会等待插入执行完毕后, 再进场元数据操作
  3. 元数据执行完毕后立即释放锁, 耗时的populate阶段, 是无锁状态. 因此第二次插入, 只需要等待极小的一个时间即可执行数据插入动作.

使用限制

目前如果有大型insert的操作的话, 此时会无法完成基于底表的创建视图操作

复制表问题

目前视图使用create view to table的方式建立的, 如果视图里面带有populate, 就会出现MV多数据的情况, 原因如下图所示

image-20220227114313082

解决方案

只允许一个节点写入, 跟DLAP-Manager交流, 目前复制表只有一个节点可写, 符合预期

image-20220227114345189

从上面的分析来看, 整体物化视图的初始化, 比较合适构建在外部的Manager中. 但此时就需要处理场景2中数据重复的问题

Insert数据重复

Insert数据重复比较好了解, 因为是人工操作, 因此创建完毕物化视图, 和执行insert命令期间, 有可能已经有一些批次的数据写入到物化视图中, 如果此时导入全部的历史数据, 那么数据就会出现重复的情况.

去重视图一般不影响, 但如果是聚合是视图, 那么结果将不正确.

解决方案

数据快照

大底表的处理方案

  1. 创建视图时, 获取底表所有的分区

  2. 通过参数设置并发, 创建并发个数的临时表

  3. 按照分区修改插入语句, 此时引擎测需要保证: 该表达式能够完成分区裁剪, 并在实际执行时忽略掉判断(这是一个难点)

    where splitByChar(‘‘,_part)[1]=’1993’ and toInt32(splitByChar(‘‘,_part)[3]) < 500

  4. 临时表的插入, 只能有一个并发. 插入完毕后, 用attach parition的方式, 将分区加入到物化视图的底表中, 然后删除临时表, 再重新创建一个临时, 开启下一轮操作.

  5. 如果插入临时表时候, 系统需要自动清空(或者删除重建)临时表, 并重试. 如果attach时失败, 则返回异常, 由用户删除物化视图, 并开始重建工作.

  6. 期间Clickhouse Server节点如果出现异常, 则整个任务失败. 如果DLAP-Manager出现重启, 则可以继续redo整个任务.

自适应执行

社区在Spark2.3版本之后的AdaptiveExecute特性之中就能很好的解决Partition个数过多导致小文件过多的问题.
通过动态的评估Shuffle输入的个数(通过设置spark.sql.adaptive.shuffle.targetPostShuffleInputSize实现), 可以聚合多个Task任务, 减少Reduce的个数
使用方式:

1
2
set spark.sql.adaptive.enabled=true
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128MB

优点:

  • 自动根据任务的数据量进行聚合

缺点:

  • 必须存在Shuffle过程, 否则不生效
  • 任务的Shuffle输出比不能太低

Shuffle输出比, 为一个Shuffle任务中最后Output的数据量除以ShuffleRead的数据量的数值. 如果ShuffleRead为100GB, 而输出为1GB, 那么Shuffle输出比为1%. 如果这值比较低, 说明Task之中有很高强度的Filter功能. 这个数值太低会对系统产生比较大影响, 例如每个Shuffle块为128MB, 如果输出比为10%, 那么最后在HDFS之中只有12.8MB, 就如会出现小文件问题. 因此动态执行功能并不会对此产生太大的效果. 现实中, 由于SparkSQL已经有比较高效的FilterPushDown功能, 因此这个比例不太太高, 在在20%以上.

HINT方式

社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件
使用方式:

1
2
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

优点:

  • 支持简单无Shuffle模式的Reparation

缺点:

  • 需要人工干预, 设计Partition的个数, 而对于变化的场景来说, 难有一个固定的Partition个数
  • 无法处理Shuffle输出比过低的场景

独立的小文件合并

综上所诉两个方案都无法处理Shuffle输出比过低的场景, 因此我们需要一种兜底方案: 直接读取HDFS上的数据, 进行合并操作.
当插入任务完成之后, 新启动一个Job读取所有的数据, 然后根据设置的文件大小, 进行合并并会写到HDFS之中.
由于是直读直写的方式, 因此对于数据大小的评估是非常精确的, 因此可以很好的避免Shuffle输出比的问题.

优点:

  • 基本解决了小文件问题

缺点:

  • 引入新的一次Job过程, 性能会受影响, 特别对中型任务会有一定的影响(10秒左右)

使用方式:

1
2
set spark.sql.merge.enabled=true;
set spark.sql.merge.size.per.task=134217728; --128 * 1024 * 1024 bytes

性能优化:
ORC和Parquet格式支持按行读取和按Stripe读取, Stripe读取可以认为是GroupRead, 由于不需要解析文件里面具体的数值, 因此可以按照Stripe粒度读取文件, 再写入文件之中, 以Stripe粒度合并文件.
1
set spark.sql.merge.mode=fast; -- 默认是pretty, 是逐行读写文件的, 性能较慢

实际上说, 这种方式与启动独立合并的任务, 后台不停的合并是一样的, 只不过将这种插入到每个SQL任务中, 并自动完成了

基数估计

通俗的话来讲, 就是求count distinct, 在公司内部有大量UV场景, 因此一款数据库对基数估计的支持是一个非常重要的功能.

一般计算基数, 通过分布式构造Hash表, 进行group by求解, 但该方式非常消耗内存和CPU, 无法满足大型互联网的要求, 因此有以下两个优化方向.

Bitmap

对于Long/Int型数据, 可以通过Bitmap方式来求解. Bitmap的存储和计算效率会明显优于Hash表.

普通的Bitmap结构比较清晰, 这里简单讲一下Roaring Bitmaps.

普通的bitmap为一个巨型数组, 而Roaring Bitmap会分为2级结构, 一级分桶, 总共有65535个(short最大值), 每个桶内则是short类型的bitmap, 可以存储65535个字节.

对于一个4字节的Int类型, 会分别取高16位和低16位, 也就是一个Int分裂为2个short类型.

前一个short表示index, 索引到具体的桶内, 后一个short在桶内的bitmap中操作.

short类型的bitmap有三种类型:

  1. array[short]类型
  2. 普通bitmap类型
  3. RunLength类型

img

对于String类型数据, 可以通过一个KV字典, 将String转化为Int后再行求解.

HLL

以上面方法不同, HLL是一种近似计算基数的方式, 它是基于以下概率论的假设

img

在真实求解的时候, HLL构造一个桶列表(byte类型, 图中是64个), 取hash值后6位(64=2^6), 索引对应的桶位置, 然后取第一个除首位1之外1的值, 最后将所有桶的数值, 求取调和平均数. (图中Loglog算法为算数平均数, 误差较大)

img

百分数

T-Digest

准确说tdigest并非是百分数计算方法, 而是一种抽样方式, 通过引入质心的概念, 完成类似于KNN聚类效果.

聚类之后, 数据量比原来会小很多, 然后再调用精确计算百分位的函数quantile进行计算.

T-Digest有两种方式, 一种称为buffer and merge, 另一种称为cluster, 整个算法过程主要在平衡误差和计算效率的结果.

具体算法就不学习了, 有需要时, 再来看这个pdf

索引

BloomFilter

bloomfilter

0%