Carlmartin' Blog

In me the tiger sniffs the rose.

SQL

1
2
3
4
5
6
ReadListener(handleEvent) ->ConnectProcessor(processOnce ->dispatch->handleQuery) -> StmtExecutor(execute)
StmtExecutor(execute)
|-> StatementPlanner(plan): Analyzer(analyze) -> createQueryPlan: Optimizer -> PlanFragmentBuilder
|-> handleQueryStmt : Coordinator(exec) -> Coordinator(getNext) -> MysqlChannel(sendOnePacket)
Coordinator(exec)

读流程

1
olap_scan_node -> tabel_scanner -> tablet_reader(_init_collector) -> BetaRowset(get_segment_iterators) -> SegmentIterator

Rowset package

1
SegmentIterator(do_get_next,_init,_get_row_ranges_by_keys,_init_column_iterators) -> Segment(new_column_iterator) ->ColumnReader->ColumnIterator
1
ColumnIterator >> (DefaultValueColumnIterator|ArrayColumnIterator|DictCodeColumnIterator|ScalarColumnIterator)
1
Segment::new_iterator >> (SegmentChunkIteratorAdapter | SegmentIterator)
1
ColumnReader >> (ZoneMapIndexReader|OrdinalIndexReader|BitmapIndexReader|BloomFilterIndexReader) >> IndexedColumnReader >> IndexedColumnIterator >> IndexPageIterator
阅读全文 »

前情回顾

总结之前的原子写入文章, 其实原子写入解决的是数据脏写的问题, 但是存在数据脏读的问题, 即当写入失败回退时, 部分成功的节点的数据, 已经能够被访问了.

那么从数据库经典的事务级别来说, 实现原子写入就实现了ACID的原子性, 但由于脏读状态的存在, 目前的事务级是读未提交.

读未提交实际上在OLAP应用中, 对大多数业务的影响并没有那么大, 相对于总数为亿行级别的数据, 单次写入个数在10万以内, 因此数据的误差只有万分之一到千分之一, 在大多数业务系统中并不会有那么大的影响.

但对于某些数据量比较小的用户, 读未提交还是会对他们产生比较大的疑问, 因此实现读已提交的隔离级别, 虽非必要, 但是最好如此.

OLAP系统究竟需要怎么样的隔离级别, 这是一个非常好的问题. 隔离级别需要应用层来确定, 但从目前观察到的业务状态, 重复读其实在OLAP中没有必要, 因为OLAP系统中, 用户的SQL很多有依赖关系, 不会出现SQL2的某些数值, 需要从SQL1的结果中获取的场景, 因此可重复读基本上没有实现的意义.

实现读已提交

image-20211108150144214

在这个架构中需要引入全局的Zookeeper作为协调者. 在ZK上会维护一个processing_insert的文件夹, 其中每个文件表示正在写入的批次信息, 大多数情况下该目录下只有一个文件, 因为一般只有一个写入的Flink的任务在工作.

阅读全文 »

众所周知, Clickhouse是没有事务能力的, 对写入数据也不承诺原子性保护, 但在业务使用过程中, 用户往往需要数据量尽量一致, 现在在使用CK时, 用户经常在校对离线和实时数据时, 发现数据行数都不一致.

因此, 我们需要给力CK加上一个原子性写入的保护, 但基于OLAP系统的吞吐量, 给CK加强一致事务保护是不现实的, 所以我们的目标并不是100%的原子性, 而是一个折中.

有个要注意的点, 这篇文章讨论的是写入的原子性, 它只能保证写入要么全部成功, 那么全部失败; 它能保证最终一致性, 但无法保证强一致性; 也无法承诺任何隔离性; 但可以保证持久性.

实际上用ACID来评价这个原子性, 并不是很恰当, 例如最终一致性是分布式复制协议里面的概念; 这里只是用数据库的ACID做为评价的指标.

实时数据写入流程

image-20210824150229371

目前实时数据导入的流程大致入上图所示:

  1. 数据存储在Kafka中, 数据从Kafka读取, 并流向Flink程序. 由于Flink有比较完善的Checkpoint机制, 只要Source能够回放, 就能保证数据是一致的. 因此这个步骤不会丢失数据

  2. 数据在Flink中做完转化后, 会写入到CHProxy中. CHProxy是一个Clickhouse的代理, 负责鉴权或者流程控制等功能. 这个步骤比较容易丢数据, 因为Flink要求Sink算子能够幂等写入, 但CHProxy和Clickhouse现有功能都无法保证.

  3. CHProxy接受到写入请求后, 会随机选择一个Clickhouse后端执行写入SQL. 由于随机选择, Flink重试后有可能导致数据写入到不同shard节点, 导致重复

    1. Flink选CHProxy也是随机的
    2. 写入时, 直接写入本地表, 数据没有做hash
  4. Clickhouse节点接收到SQL请求后, 会将数据按照分区切分数据, 写入临时文件, 再分批提交. 提交过程失败, 会导致任务重试后数据重复(部分提交成功的情况)

  5. 多replica集群, 数据写入到主节点后, 会异步的同步到从节点, 此时主节点下线会导致数据丢失 (节点重启后, 还能恢复)

从上面的分析可知, 2,3,4,5都存在着数据重复或者丢失的风险, 其中2和3是机制问题, 一旦触发Flink重试(重启应用必然触发)/CHProxy随机路由(重试写入时就触发), 数据就会出现大概率重复. 而4和5是小概率实践, 4是一个磁盘写入操作, 即使顺序执行多个DP操作时, 出现异常的概率都是相对较小的;5的情况, 只要节点能够重启, 就能够保证最终一致性.

阅读全文 »

SQL流程概览

数据库发展至今, SQL处理流程已经非常完善, 以Presto的分析流程为例

img

  1. SQL文本由客户端提交到服务端时, 首先会进入Parser模块, 进行词法和语法解析
  2. 经过解析后, 会生成AST(abstract syntax code)树, 然后会进入语义分析阶段, 结合元数据信息, 将AST中无意义的文本, 转化为有含义的对象.
  3. 然后根据AST树, 转化为逻辑计划
  4. 逻辑计划经过优化器(optimizer)后, 生成最终执行的执行计划(在Presto是分布式执行计划)
  5. 最后放入执行器执行具体任务

Clickhouse的流程也基本相似:

image-20210816161057679

  1. 客户单通过TCP端口提交任务到Clickhouse Server
  2. TCP Handler会响应请求, 并调用executeQuery函数处理请求
  3. executeQuery会处理上面SQL处理的5个步骤, 最后生成执行计划
    1. Parser为词法解析
    2. Interpreter为语义解析
    3. QueryPlan为逻辑计划
    4. QueryPipeline为经过优化的执行计划
  4. QueryPipeline放入到PipelineExecutor中执行任务

注: 由于Clickhouse并非一个MPP的数据库, 因此并没有分布式执行计划一说, 分布式方式被拆散到QueryPlan之中.

TCPHandler

阅读全文 »

相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<remote_servers incl="clickhouse_remote_servers">
<test01>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>host01</host>
<port>9100</port>
<password>xxxx</password>
</replica>
<replica>
<host>host02</host>
<port>9100</port>
<password>xxxx</password>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>host03</host>
<port>9100</port>
<password>xxxx</password>
</replica>
<replica>
<host>host04</host>
<port>9100</port>
<password>xxxx</password>
</replica>
</shard>
</test01>
</remote_servers>

在Clickhouse的config.xml文件中有以上类似配置项内容, 在这个样例里面, 定义了一个叫做test01的集群, 它由4个节点组成, 其中分为2个shard, 2副本.

大致是如下一种形式, shard0和shard1的数据时是不一样的, 而同个shard中的数据时完全一样的.

image-20210830142321251

Shard内的数据复制能力, Clickhouse发明了一种叫做ReplicatedMergeTree的引擎, 通过Zookeeper做元数据中心, 完成多个节点建的数据同步.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE TABLE default.lineorder_local
(
`LO_ORDERKEY` UInt32,
`LO_LINENUMBER` UInt8,
`LO_CUSTKEY` UInt32,
`LO_PARTKEY` UInt32,
`LO_SUPPKEY` UInt32,
`LO_ORDERDATE` Date,
`LO_ORDERPRIORITY` LowCardinality(String),
`LO_SHIPPRIORITY` UInt8,
`LO_QUANTITY` UInt8,
`LO_EXTENDEDPRICE` UInt32,
`LO_ORDTOTALPRICE` UInt32,
`LO_DISCOUNT` UInt8,
`LO_REVENUE` UInt32,
`LO_SUPPLYCOST` UInt32,
`LO_TAX` UInt8,
`LO_COMMITDATE` Date,
`LO_SHIPMODE` LowCardinality(String)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/lineorder_local', '{replica}')
PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY)
SETTINGS index_granularity = 8192

ReplicatedMergeTree的建表语句如上所示, 和MergeTree引擎的区别就是ReplicatedMergeTree('/clickhouse/tables/{shard}/default/lineorder_local', '{replica}')这段内容, '/clickhouse/tables/{shard}/default/lineorder_local'表示的是Zookeeper的路径, '{replica}'表示的是replica的序号.

{value}这类数值在配置文件夹的macros.xml文件中定义

阅读全文 »

LSM(Log-Struct Merge Tree)是大数据领域一种非常常见的技术, 例如LevelDB, RocksDB亦或是HBase都采用了LSM结构来完成存储系统的构架.

但你有没有发现, 这些大数据系统, 大多数都是KV方式的存储系统, 对外的接口也是以点查方式提供的.

虽然例如HBase的RowKey是全局有序的, 但是如果使用RowKey做大规模的范围查询, 它的整体效果可能还比不上在HDFS的文件上直接做过滤更有效.

基于以上的问题, 我们再回顾一下LSM的设计, 解答一下为什么LSM最适合点查, 然后再介绍Clickhouse基于LSM做了哪些取舍, 让MergeTree适合做范围的查询分析.

LSM树回顾

img

LSM的架构如上图所示, 数据先存放在Memory中, 然后通过一次的合并, 数据会固化到磁盘上.

为了防止每次合并都处理全部的数据, 因此LSM会将数据分层, 上一层比下一层小很多, 并且上一层的合并频率也比下一层多很多次.

由于需要合并去重的要求, 因此LSM树必须要指定一个类似主键之类的不重复的key.

阅读全文 »

数据目录

当Clickhouse创建一个表, 会在配置文件path指定的路径下对应数据目录.

数据目录的路径为: {path}/data/{database}/{table}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
create table src (
id Int32,
year String,
num Int64,
index a num TYPE minmax GRANULARITY 2
) Engine =MergeTree()
partition by id
order by (id, year)
primary key (id, year)

// 插入数据
insert into src values(1,'2021',12);
insert into src values(1,'2021',12);
// 插入数据
insert into src values(2,'2020',13);

例如上面的例子, 在default数据库上创建一个src的表, 如果path是在~/clickhouse/data目录, 那么整个目录为

~/clickhouse/data/data/default/src

当插入3次数据后, Clickhouse会再数据目录下, 每次都新建一个目录, 如下图所示, 这种目录在Clickhouse称之为DataPart

image-20210828175325388

目录的格式为: partition_min_block_max_block_level

阅读全文 »

无名的命名空间

1
2
3
4
5
6
7
namespace 
{
void fun()
{
//....
}
}

由于命名空间没有名字,在其他文件中显然无法引用,它只在本文件的作用域有效。

若无名命名空间的成员fun函数的作用域为文件A,在文件A中使用无名命名空间的成员,不用也无法用命名空间名限定。

模板里面使用具体的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct NeedChild
{
using Condition = bool (*)(const ASTPtr & node, const ASTPtr & child);

static bool all(const ASTPtr &, const ASTPtr &) { return true; }
static bool none(const ASTPtr &, const ASTPtr &) { return false; }
};

/// Simple matcher for one node type. Use need_child function for complex traversal logic.
template <typename Data_, NeedChild::Condition need_child = NeedChild::all, typename T = ASTPtr>
class OneTypeMatcher
{
public:
using Data = Data_;
using TypeToVisit = typename Data::TypeToVisit;

static bool needChildVisit(const ASTPtr & node, const ASTPtr & child) { return need_child(node, child); }

static void visit(T & ast, Data & data)
{
if (auto * t = typeid_cast<TypeToVisit *>(ast.get()))
data.visit(*t, ast);
}
};

NeedChild这种用法, 目前不知道具体的称谓.

Lambda表达式

基本格式

阅读全文 »

背景介绍

Clickhouse的查询性能是有目共睹的优秀, 但与之对应单个查询对于机器资源的消耗也是非常巨大的, 导致Clickhouse整体的QPS会比较低.

当用户需要提高QPS时, 往往会通过建立物化视图, 进预计算, 查询时直接走物化视图来进行加速.

但这种方案有两个缺点:

  1. 实际上有多张表, 明细查询可能需要走底表, 聚合查询需要查物化视图, 用户管理起来会有一定麻烦程度
  2. 如果出现慢查询, 需要用户新建一张物化视图, 然后导入数据, 再通过上线变更的方式, 来规避, 整体流程过长

针对以上问题, 业界的预聚合引擎, 类似麒麟都实现了SQL rewrite的功能, 来自动替换用户的查询SQL, 这样上面的问题就直接解决了.

Clickhouse并没有打算基于物化视图的SQL rewrite, 而是实现一个Projection的功能, 号称是DataPart-Level的物化视图

Originated from Vertica

  • Projections are collections of table columns,
  • Projections store data in a format that optimizes query execution

看一下这两个的对比, 这里的Query Routing就是我们需要的能力

阅读全文 »
0%