深入浅出Clickhouse: 分布式设计
相关配置
1 | <remote_servers incl="clickhouse_remote_servers"> |
在Clickhouse的config.xml
文件中有以上类似配置项内容, 在这个样例里面, 定义了一个叫做test01
的集群, 它由4个节点组成, 其中分为2个shard, 2副本.
大致是如下一种形式, shard0和shard1的数据时是不一样的, 而同个shard中的数据时完全一样的.
Shard内的数据复制能力, Clickhouse发明了一种叫做ReplicatedMergeTree
的引擎, 通过Zookeeper做元数据中心, 完成多个节点建的数据同步.
1 | CREATE TABLE default.lineorder_local |
ReplicatedMergeTree
的建表语句如上所示, 和MergeTree
引擎的区别就是ReplicatedMergeTree('/clickhouse/tables/{shard}/default/lineorder_local', '{replica}')
这段内容, '/clickhouse/tables/{shard}/default/lineorder_local'
表示的是Zookeeper的路径, '{replica}'
表示的是replica的序号.
{value}
这类数值在配置文件夹的macros.xml
文件中定义
1 | <yandex> |
Shard间的分区, Clickhouse也创建了一个Distributed
数据表引擎表示, 负责写入时数据的分发, 读取时数据的合并.
但是Distributed
表不存数据, 因此需要有一个local表存储真实的数据, 上面提到的ReplicatedMergeTree
就是数据表.
建表语句如下, 整体的结构与local表一致, 引擎Distributed
中, test01
表示是cluster, default
表示的数据库, lineorder_local
表示的是表名, 而xxHash32(LO_ORDERKEY)
表示根据这个表达式来分区, 通过xxHash
分区可以保证相同的key, 必然在一个Shard内, 这样才能完成后续的去重或者预聚合的功能,. 分布式引擎, 还支持rand
方式的分区, 随机分区会减少插入过程中做hash的消耗, 但会存在数据重复的情况, 需要根据情况设置.
1 | CREATE TABLE default.lineorder |
数据复制
在一个shard内的数据流动叫做数据复制, 我们经常听到的一致性协议
以及paxos
或者raft
都在这个范畴之内, 是一个比较复制事情.
Clickhouse实现的时候, 直接引入了Zookeeper作为元数据中心, Zookeeper通过它自己的ZAB
实现了一致性协议
, 但Zookeeper的QPS不高, 当元数据量太多时, 容易引发瓶颈. 目前社区基于NuRaft实现ClickKeeper, 希望替代Zookeeper. 这都是后话, 先不去管理, 基于Zookeeper的分布式复制, 需要完成的两件事情是, 数据插入和数据变更, 我们分情况看一下这两个设计.
数据插入
- 数据写入shard的一个节点
- 在本地节点中, 生成DP文件
- 并将DP写入本地的事件写入到ZK上
- 其他副本监控着写入时间, 如果发现有数据写入的时间, 就会立刻读取事件.
- 事件写明了DP的编号以及DP的地址, 副本节点会主动去事件写入节点拿数据
- Clickhouse由一个内部端口, 专门用于节点间数据传输, 通过该端口将DP文件传输到节点本地
- 然后再将DP数据加载到节点内部, 并更新ZK上的信息, 告诉ZK自己已经拿到DP, 并且其他副本也可以从自己这儿下载DP
默认情况下, Clickhouse只会写入一个节点, 然后写入请求就接受了, 这种情况下, 一旦主节点挂机, 这份数据就无法被传输, 可以设置配置项insert_quorum
强制副本写入其他节点后再返回.
数据变更
- 事件的触发有两种, 一种由内部的
merge
请求触发, 一种由外部客户端触发的mutation
或者alter
方法 - 主节点或者客户端触发变更后, 由响应节点将事件写入到Zookeeper, 写入后该任务就算完成了
- 其他所有副本都监听这个事件
- 一旦发现新的变更, 就会自己按照事件内容处理任务
由于事件处理效率并不相同, 如果有一个节点, 处理事件晚于其他节点很多, 那么它可能直接去其他节点获取已经变更过的数据, 而非在自己节点完成.
数据分区
分布式DDL的逻辑相对来说比较简单, 也是在Zookeeper上写入一条事件记录, 然后每个节点监听执行, 这个步骤就不展开了, 这里只聚焦于分区写入的流程.
- 数据写入分布式表
- 接收到数据写入的节点, 将分布式表写入本地的临时文件夹中, 临时文件夹包含远程shard的地址
- 本地的数据直接attach数据, 然后分批发送远程的数据.
分布式插入可以设置为同步插入模式, 需要设置配置项insert_distributed_sync=1
. 由于分布式表和local表的分离, 默认的异步插入会产生积压, 此时如果将local表的字段类型修改后, 整个积压任务就会一直异常, 并卡住, 后续的数据都无法插入.
因此, 我们写入数据的时候, 会尽量直接写local表, 不是去重表类型的, 基本上不让用户使用分布式表写入.