[Clickhouse研究]: TooManyPart问题解决思路

问题现象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2021-12-08 10:20:52,561 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 1
org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 252, host: 10.88.129.186, port: 8024; Code: 252, e.displayText() = DB::Exception: Too many parts (315). Merges are processing significantly slower than inserts.: while write prefix to view Storage omega_analyse_project_stream.dwm_log_pub_event_pvuv_hi_view_local (version 206.1.1)

at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:59) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1061) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1026) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHouseStatementImpl.sendStream(ClickHouseStatementImpl.java:1019) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:381) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.streaming.connectors.clickhouse.shaded.ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeBatch(ClickHousePreparedStatementImpl.java:364) ~[flink-connector-clickhouse_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.executor.SimpleBatchStatementExecutor.executeBatch(SimpleBatchStatementExecutor.java:71) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:207) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:174) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:123) ~[flink-connector-jdbc_2.11-1.12.0-700.jar:1.12.0-700]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_77]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_77]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

Flink任务写入CK时, 出现以上异常, 并开始重试, 如果重试次数过多, 就会触发flink的重启, flink重启达到5次, 用户就会收到告警.

目前Flink重试的时候, 会造成Checkpoint期间的数据, 全部重新写入, 即能够保障AT_LEAST_ONCE语义.

原因分析

image-20220304152821844

在数据每次写入的时候, 都会调用该方法, 判断数据写入频率是否过快了, 如果太快了, 就会执行delay操作.

如何判断过快呢? 通过以下判断

1
parts_count_in_partition >= settings->parts_to_throw_insert

parts_count_in_partition表示所有partition中DataPart个数最多的一个, parts_to_throw_insert是一个配置项, 默认是300

CK每次写入一批数据, 就会生成一个DataPart, 然后会有Merge线程merge多个DP合并为一个.

写入太过频繁或者Merge过慢, 就会出现Too Many Parts的问题.

写入频繁

写入频繁分两种情况:

第一种, 用户数据量不大, 但flink的参数写入不当, 由于现在Flink多是采用jdbc的接口, 攒批的参数需要用户手动设置, 默认值不随我们配置, 因此会导致批过小的问题

第二种, 用户数据量较大, flink参数已经被值班人员调整过, 但由于数据量巨大, 依然出现写入过快的问题. 这种问题比较容易出现在”小宽表”上, 即每行数据量比较少, 但是行数特别多的场景.

Merge过慢

Clickhouse一个节点的Merge线程为48个, 而且由于集群区分读写节点, 写节点就一个, 因此实际上单个shard内, 合并是单点的.

因此解决merge过慢的问题, 可以从增加写节点个数方式出发.

解决方案

针对以上的问题, 我们可以对问题, case by case制定改进方案

MemoryTable方式

这个方案是针对写入频繁中的第一种情况, 这种情况下, 在CK内部完成攒批, 可以解决用户Flink配置问题

MemoryTable可以参考CK中的Buffer引擎来实现, 但逻辑需要内嵌于MergeTree引擎中, 否则上线成本会增加很多.

但Buffer引擎和MemoryTable还是有一些区别

  1. Buffer并不保证数据的一致性, 而在MemoryTable中, 需要实现WAL方式, 支持数据恢复
  2. Buffer引擎是单机的, 而MergeTree需要考虑分布式的能力, 防止节点异常的情况, 需要实现WAL文件的复制能力

除去以上的问题, 还有两个细节问题, 在实现上要非常注意:

第一, 如果用户的SQL中有final等特别的关键字, 那么需要跟内存的数据做merge

第二, 如果用户准备修改表结构, 那么内存中的结构也会跟着磁盘的结构一起修改, 此时还需要注意新写入的数据情况

难点一

final等关键字

难点二

alter时, 数据写入

列式格式写入

这种情况可以解决写入频繁中的第二种情况, 如果列存数据的压缩能力, 将批次变大.

由于列存格式的适配, 需要引入新的Flink Connect, 那么在修改过程中, 我们可以在新的Connector中, 实现对于写入频繁的第一种场景的参数进行有效的空,

目前已经支持Parquet格式的写入, 但是Parquet不支持复杂类型, 例如Array/Map等, 最好能够引进ORC或者CK本身的Native的Block数据结构, 减去转换的开销.

难点三

Native格式是CK最有效的写入格式, CK主要将数据按照自己读起来就行, 省却了压缩编码的工作, 会极大的增加导入的性能.

但Native格式是CK的内部的格式, 可能需要实现在Java中用JNI接口调用C++源码.

该方案在其他列存格式有问题的情况下, 再来实现吧.

读写交替

下面的三个方面的目标, 都是增加写节点.

目前集群由读写节点确定, 读写节点是固定, 只有故障的时候, 才会变化, 从监控上看, 读写节点压力非常不一致, 写节点的网络写入只有读的一半左右.

因此, 在公共集群上, 可以实现读写能力的交替, 即对同一张表来说, 数据永远只写一个节点(通过表的hash计算), 但对于不同表的写入, 可以是不同的节点, 因此称为读写交替.

多写集群

比上面的方案更好的, 是完成表级别的多写能力, CK原生也是支持的. 但这种情况下, 在变更时候的数据一致性, 还是需要重点测试的.

难点四

表多写的影响, Clickhouse原生支持Multi Write, 但不确定是否在复杂环境下, 有异常场景, 可能需要做一次严格的测试来论证这个问题.

集群扩容

最后一种方案, 是最简单的. 就是shard扩容, 但我们目前一直都没有一个扩shard的方案, 因此该方案, 依赖辉哥的读写分离的方案, 等读写分离实现后, 实现集群扩容会简化很多.

难点五

读写分离方案, 支持扩容shard实现写入加数