深入浅出Clickhouse: 分布式设计

相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<remote_servers incl="clickhouse_remote_servers">
<test01>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>host01</host>
<port>9100</port>
<password>xxxx</password>
</replica>
<replica>
<host>host02</host>
<port>9100</port>
<password>xxxx</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>host03</host>
<port>9100</port>
<password>xxxx</password>
</replica>
<replica>
<host>host04</host>
<port>9100</port>
<password>xxxx</password>
</replica>
</shard>
</test01>
</remote_servers>

在Clickhouse的config.xml文件中有以上类似配置项内容, 在这个样例里面, 定义了一个叫做test01的集群, 它由4个节点组成, 其中分为2个shard, 2副本.

大致是如下一种形式, shard0和shard1的数据时是不一样的, 而同个shard中的数据时完全一样的.

image-20210830142321251

Shard内的数据复制能力, Clickhouse发明了一种叫做ReplicatedMergeTree的引擎, 通过Zookeeper做元数据中心, 完成多个节点建的数据同步.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE default.lineorder_local
(
`LO_ORDERKEY` UInt32,
`LO_LINENUMBER` UInt8,
`LO_CUSTKEY` UInt32,
`LO_PARTKEY` UInt32,
`LO_SUPPKEY` UInt32,
`LO_ORDERDATE` Date,
`LO_ORDERPRIORITY` LowCardinality(String),
`LO_SHIPPRIORITY` UInt8,
`LO_QUANTITY` UInt8,
`LO_EXTENDEDPRICE` UInt32,
`LO_ORDTOTALPRICE` UInt32,
`LO_DISCOUNT` UInt8,
`LO_REVENUE` UInt32,
`LO_SUPPLYCOST` UInt32,
`LO_TAX` UInt8,
`LO_COMMITDATE` Date,
`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/lineorder_local', '{replica}')
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
SETTINGS index_granularity = 8192

ReplicatedMergeTree的建表语句如上所示, 和MergeTree引擎的区别就是ReplicatedMergeTree('/clickhouse/tables/{shard}/default/lineorder_local', '{replica}')这段内容, '/clickhouse/tables/{shard}/default/lineorder_local'表示的是Zookeeper的路径, '{replica}'表示的是replica的序号.

{value}这类数值在配置文件夹的macros.xml文件中定义

1
2
3
4
5
6
7
<yandex>
<macros>
<cluster>test01</cluster>
<shard>test01-01</shard>
<replica>01</replica>
</macros>
</yandex>

Shard间的分区, Clickhouse也创建了一个Distributed数据表引擎表示, 负责写入时数据的分发, 读取时数据的合并.

但是Distributed表不存数据, 因此需要有一个local表存储真实的数据, 上面提到的ReplicatedMergeTree就是数据表.

建表语句如下, 整体的结构与local表一致, 引擎Distributed中, test01表示是cluster, default表示的数据库, lineorder_local表示的是表名, 而xxHash32(LO_ORDERKEY)表示根据这个表达式来分区, 通过xxHash分区可以保证相同的key, 必然在一个Shard内, 这样才能完成后续的去重或者预聚合的功能,. 分布式引擎, 还支持rand方式的分区, 随机分区会减少插入过程中做hash的消耗, 但会存在数据重复的情况, 需要根据情况设置.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 CREATE TABLE default.lineorder
(
`LO_ORDERKEY` UInt32,
`LO_LINENUMBER` UInt8,
`LO_CUSTKEY` UInt32,
`LO_PARTKEY` UInt32,
`LO_SUPPKEY` UInt32,
`LO_ORDERDATE` Date,
`LO_ORDERPRIORITY` LowCardinality(String),
`LO_SHIPPRIORITY` UInt8,
`LO_QUANTITY` UInt8,
`LO_EXTENDEDPRICE` UInt32,
`LO_ORDTOTALPRICE` UInt32,
`LO_DISCOUNT` UInt8,
`LO_REVENUE` UInt32,
`LO_SUPPLYCOST` UInt32,
`LO_TAX` UInt8,
`LO_COMMITDATE` Date,
`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = Distributed('test01', 'default', 'lineorder_local', xxHash32(LO_ORDERKEY))

数据复制

在一个shard内的数据流动叫做数据复制, 我们经常听到的一致性协议以及paxos或者raft都在这个范畴之内, 是一个比较复制事情.

Clickhouse实现的时候, 直接引入了Zookeeper作为元数据中心, Zookeeper通过它自己的ZAB实现了一致性协议, 但Zookeeper的QPS不高, 当元数据量太多时, 容易引发瓶颈. 目前社区基于NuRaft实现ClickKeeper, 希望替代Zookeeper. 这都是后话, 先不去管理, 基于Zookeeper的分布式复制, 需要完成的两件事情是, 数据插入数据变更, 我们分情况看一下这两个设计.

数据插入

image-20210830145322611

  1. 数据写入shard的一个节点
  2. 在本地节点中, 生成DP文件
  3. 并将DP写入本地的事件写入到ZK上
  4. 其他副本监控着写入时间, 如果发现有数据写入的时间, 就会立刻读取事件.
  5. 事件写明了DP的编号以及DP的地址, 副本节点会主动去事件写入节点拿数据
  6. Clickhouse由一个内部端口, 专门用于节点间数据传输, 通过该端口将DP文件传输到节点本地
  7. 然后再将DP数据加载到节点内部, 并更新ZK上的信息, 告诉ZK自己已经拿到DP, 并且其他副本也可以从自己这儿下载DP

默认情况下, Clickhouse只会写入一个节点, 然后写入请求就接受了, 这种情况下, 一旦主节点挂机, 这份数据就无法被传输, 可以设置配置项insert_quorum强制副本写入其他节点后再返回.

数据变更

image-20210830145330506

  1. 事件的触发有两种, 一种由内部的merge请求触发, 一种由外部客户端触发的mutation或者alter方法
  2. 主节点或者客户端触发变更后, 由响应节点将事件写入到Zookeeper, 写入后该任务就算完成了
  3. 其他所有副本都监听这个事件
  4. 一旦发现新的变更, 就会自己按照事件内容处理任务

由于事件处理效率并不相同, 如果有一个节点, 处理事件晚于其他节点很多, 那么它可能直接去其他节点获取已经变更过的数据, 而非在自己节点完成.

数据分区

分布式DDL的逻辑相对来说比较简单, 也是在Zookeeper上写入一条事件记录, 然后每个节点监听执行, 这个步骤就不展开了, 这里只聚焦于分区写入的流程.

image-20210830152305274

  1. 数据写入分布式表
  2. 接收到数据写入的节点, 将分布式表写入本地的临时文件夹中, 临时文件夹包含远程shard的地址
  3. 本地的数据直接attach数据, 然后分批发送远程的数据.

分布式插入可以设置为同步插入模式, 需要设置配置项insert_distributed_sync=1. 由于分布式表和local表的分离, 默认的异步插入会产生积压, 此时如果将local表的字段类型修改后, 整个积压任务就会一直异常, 并卡住, 后续的数据都无法插入.

因此, 我们写入数据的时候, 会尽量直接写local表, 不是去重表类型的, 基本上不让用户使用分布式表写入.