[Clickhouse研究]: Clickhouse的WAL功能

Clickhouse的WAL功能

设计目的

解决小批量数据写入时, 频繁写入dataPart, 导致磁盘繁忙的问题或者出现DB::Exception: Too many parts

具体可以参考这篇文档

并非binlog的模式, 数据flush后, 会清理wal文件

源码分析

首先看写逻辑的入口, 在MergeTreeDataWriter::writeTempPart创建了一个MergeTreeData::MutableDataPartPtr的类型

1
2
3
4
5
6
7
8
9
MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPartition & block_with_partition, const StorageMetadataPtr & metadata_snapshot)
{
auto new_data_part = data.createPart(
part_name,
data.choosePartType(expected_size, block.rows()),
new_part_info,
createVolumeFromReservation(reservation, volume),
TMP_PREFIX + part_name);
}

MergeTreeData::MutableDataPartPtr有三种实现, IN_MEMORY是写入到内存中, 由于内存是易失的, 所以需要WAL功能的辅助.COMPACTWIDE都是文件存储, 但文件编码和压缩方式不同.
1
2
3
4
5
6
7
8
9
10
11
12
13
MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(const String & name,
MergeTreeDataPartType type, const MergeTreePartInfo & part_info,
const VolumePtr & volume, const String & relative_path) const
{
if (type == MergeTreeDataPartType::COMPACT)
return std::make_shared<MergeTreeDataPartCompact>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::WIDE)
return std::make_shared<MergeTreeDataPartWide>(*this, name, part_info, volume, relative_path);
else if (type == MergeTreeDataPartType::IN_MEMORY)
return std::make_shared<MergeTreeDataPartInMemory>(*this, name, part_info, volume, relative_path);
else
throw Exception("Unknown type of part " + relative_path, ErrorCodes::UNKNOWN_PART_TYPE);
}

上面的逻辑, 主要是根据MergeTreeDataPartType的类型选择, 那么我们看一下, 上面什么时候会选择该类型: 当part的文件小于min_bytes_for_compact_part或者行数小于min_rows_for_compact_part, 只要满足一种一项即可.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
MergeTreeDataPartType MergeTreeData::choosePartType(size_t bytes_uncompressed, size_t rows_count) const
{
const auto settings = getSettings();
if (!canUsePolymorphicParts(*settings))
return MergeTreeDataPartType::WIDE;

if (bytes_uncompressed < settings->min_bytes_for_compact_part || rows_count < settings->min_rows_for_compact_part)
return MergeTreeDataPartType::IN_MEMORY;

if (bytes_uncompressed < settings->min_bytes_for_wide_part || rows_count < settings->min_rows_for_wide_part)
return MergeTreeDataPartType::COMPACT;

return MergeTreeDataPartType::WIDE;
}

那么回过头来看MergeTreeDataPartInMemory的实现

1
2
3
4
5
class MergeTreeDataPartInMemory : public IMergeTreeDataPart
{
// 忽略部分函数
mutable Block block;
}

从源码上看, MergeTreeDataPartInMemory就是原本Block写入本地磁盘, 而它则放入到内存中.

那么如果进程重启, 内存中数据丢失后, 该如何处理呢?

答案在于MergeTreeWriteAheadLog的功能中, 将用户数据写入到WAL.

当进程启动的时候, 会调用restore从WAL的文件中, 将数据恢复到内存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
{

if (action_type == ActionType::ADD_PART)
{
auto part_disk = storage.reserveSpace(0)->getDisk();
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);

part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
MergeTreePartInfo::fromPartName(part_name, storage.format_version),
single_disk_volume,
part_name);

block = block_in.read();
}
}

那么wal文件会越写越多, 什么时候会开始清理部分数据呢?

1
2
3
4
5
6
7
8
9
10
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
{
// 写数据到WAL
block_out->write(block);
block_out->flush();

auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes)
rotate(lock);
}

addPart的逻辑之中, 会检查WAL文件的大小, 当文件大于write_ahead_log_max_bytes(默认为1GB)时, 开始清理WAL文件

另外一个问题, WAL的内存部分数据存放在哪儿, 在insert的时候(renameTempPartAndReplace), 数据会放到data_parts_indexes.insert之中, read时候从这里读取数据

1
2
3
4
5
6
7
8
9
10
MergeTreeData::renameTempPartAndReplace
{
part->name = part_name;
part->info = part_info;
part->is_temp = false;
part->state = DataPartState::PreCommitted;
part->renameTo(part_name, true);

auto part_it = data_parts_indexes.insert(part).first;
}

数据恢复的时候, loadDataParts的时候, 数据被读取出来, 然后插入到data_parts_indexes之中, 通过getActiveContainingPart过滤重复的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
MergeTreeData::loadDataParts
{
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;

part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;

if (!data_parts_indexes.insert(part).second)
throw Exception("Part " + part->name + " already exists", ErrorCodes::DUPLICATE_DATA_PART);
}
}

可以看到Clickhouse的WAL机制, 并没有RocksDB的那种MemTable, 因此两个批次的数据, 并不会在内存中合并.

所有的合并操作, 依然由后台线程来处理, 支持在合并的流程中, 抽象为2个DataPart的合并, 但实际上可以是一个InMem的DP和一个OnDisk的DP做合并.

Clickhouse的合并后的数据都写入到磁盘中.

另外一个点, 在复制表中, InMem的DP依然会做同步.

DataPartsExchange有两个函数sendPartFromMemorydownloadPartToMemory, 前者用于发送数据, 后者用户下载数据, 同步后, 数据依然是InMem格式.

测试

创建一张表, 指定min_rows_for_compact_part为200,write_ahead_log_max_bytes为8192(8K)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE default.lineorder_local1
(
`LO_ORDERKEY` UInt32,
`LO_LINENUMBER` UInt8,
`LO_CUSTKEY` UInt32,
`LO_PARTKEY` UInt32,
`LO_SUPPKEY` UInt32,
`LO_ORDERDATE` Date,
`LO_ORDERPRIORITY` LowCardinality(String),
`LO_SHIPPRIORITY` UInt8,
`LO_QUANTITY` UInt8,
`LO_EXTENDEDPRICE` UInt32,
`LO_ORDTOTALPRICE` UInt32,
`LO_DISCOUNT` UInt8,
`LO_REVENUE` UInt32,
`LO_SUPPLYCOST` UInt32,
`LO_TAX` UInt8,
`LO_COMMITDATE` Date,
`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = MergeTree
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
SETTINGS index_granularity = 8192, min_rows_for_compact_part = 200,write_ahead_log_max_bytes=8192;

插入100条数据

1
insert into table lineorder_local1 select * from lineorder_local limit 100;

查看数据目录

1
2
3
4
5
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 12
drwxr-x--- 2 clickhouse clickhouse 6 Apr 12 14:53 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 14:53 format_version.txt
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 14:54 wal.bin

出现了一个wal.bin的文件
再插入一次, 发现又出现了一个bin文件
1
2
3
4
5
6
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 16
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:18 wal_1_2.bin
-rw-r----- 1 clickhouse clickhouse 0 Apr 12 15:18 wal.bin

多了一个wal_1_2.bin的文件, 我们在多插入几次, 到第5次插入的时候, 会生成一个datapart

1
2
3
4
5
6
7
8
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 40
drwxr-x--- 2 clickhouse clickhouse 4096 Apr 12 15:39 1992_1_5_1
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:18 wal_1_2.bin
-rw-r----- 1 clickhouse clickhouse 9532 Apr 12 15:26 wal_3_4.bin
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 15:39 wal.bin

再过一段时间观察, 发现wal_*_*.bin文件已经被删除了, 原因在于data_part已经commit的了
1
2
3
4
5
6
7
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$ ll
total 16
drwxr-x--- 2 clickhouse clickhouse 4096 Apr 12 15:39 1992_1_5_1
drwxr-x--- 2 clickhouse clickhouse 10 Apr 12 15:18 detached
-rw-r----- 1 clickhouse clickhouse 1 Apr 12 15:18 format_version.txt
-rw-r----- 1 clickhouse clickhouse 4766 Apr 12 15:39 wal.bin
[root@bigdata-nmg-client01.nmg01 /var/lib/clickhouse/data/default/lineorder_local1]$