Clickhouse分享:原子写入

众所周知, Clickhouse是没有事务能力的, 对写入数据也不承诺原子性保护, 但在业务使用过程中, 用户往往需要数据量尽量一致, 现在在使用CK时, 用户经常在校对离线和实时数据时, 发现数据行数都不一致.

因此, 我们需要给力CK加上一个原子性写入的保护, 但基于OLAP系统的吞吐量, 给CK加强一致事务保护是不现实的, 所以我们的目标并不是100%的原子性, 而是一个折中.

有个要注意的点, 这篇文章讨论的是写入的原子性, 它只能保证写入要么全部成功, 那么全部失败; 它能保证最终一致性, 但无法保证强一致性; 也无法承诺任何隔离性; 但可以保证持久性.

实际上用ACID来评价这个原子性, 并不是很恰当, 例如最终一致性是分布式复制协议里面的概念; 这里只是用数据库的ACID做为评价的指标.

实时数据写入流程

image-20210824150229371

目前实时数据导入的流程大致入上图所示:

  1. 数据存储在Kafka中, 数据从Kafka读取, 并流向Flink程序. 由于Flink有比较完善的Checkpoint机制, 只要Source能够回放, 就能保证数据是一致的. 因此这个步骤不会丢失数据

  2. 数据在Flink中做完转化后, 会写入到CHProxy中. CHProxy是一个Clickhouse的代理, 负责鉴权或者流程控制等功能. 这个步骤比较容易丢数据, 因为Flink要求Sink算子能够幂等写入, 但CHProxy和Clickhouse现有功能都无法保证.

  3. CHProxy接受到写入请求后, 会随机选择一个Clickhouse后端执行写入SQL. 由于随机选择, Flink重试后有可能导致数据写入到不同shard节点, 导致重复

    1. Flink选CHProxy也是随机的
    2. 写入时, 直接写入本地表, 数据没有做hash
  4. Clickhouse节点接收到SQL请求后, 会将数据按照分区切分数据, 写入临时文件, 再分批提交. 提交过程失败, 会导致任务重试后数据重复(部分提交成功的情况)

  5. 多replica集群, 数据写入到主节点后, 会异步的同步到从节点, 此时主节点下线会导致数据丢失 (节点重启后, 还能恢复)

从上面的分析可知, 2,3,4,5都存在着数据重复或者丢失的风险, 其中2和3是机制问题, 一旦触发Flink重试(重启应用必然触发)/CHProxy随机路由(重试写入时就触发), 数据就会出现大概率重复. 而4和5是小概率实践, 4是一个磁盘写入操作, 即使顺序执行多个DP操作时, 出现异常的概率都是相对较小的;5的情况, 只要节点能够重启, 就能够保证最终一致性.

综上所示, 原子写入的目标是修改2和3的逻辑, 保证数据丢失重复的概率不高于4和5.

两阶段提交

二阶段提交(Two-phaseCommit)是指,在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法(Algorithm)。通常,二阶段提交也被称为是一种协议(Protocol))。

在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。

当一个事务跨越多个节点时为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。

因此,二阶段提交的算法思路可以概括为: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。

所谓的两个阶段是指:

  • 第一阶段:投票

    该阶段的主要目的在于打探数据库集群中的各个参与者是否能够正常的执行事务,具体步骤如下:

    1. 协调者向所有的参与者发送事务执行请求,并等待参与者反馈事务执行结果;
    2. 事务参与者收到请求之后,执行事务但不提交,并记录事务日志;
    3. 参与者将自己事务执行情况反馈给协调者,同时阻塞等待协调者的后续指令。
  • 第二阶段:事务提交

    在经过第一阶段协调者的询盘之后,各个参与者会回复自己事务的执行情况,这时候存在 3 种可能性:

    1. 所有的参与者都回复能够正常执行事务。
    2. 一个或多个参与者回复事务执行失败。
    3. 协调者等待超时。

对于第 1 种情况,协调者将向所有的参与者发出提交事务的通知,具体步骤如下:

  1. 协调者向各个参与者发送 commit 通知,请求提交事务;
  2. 参与者收到事务提交通知之后执行 commit 操作,然后释放占有的资源;
  3. 参与者向协调者返回事务 commit 结果信息。

2pc-success

对于第 2 和第 3 种情况,协调者均认为参与者无法成功执行事务,为了整个集群数据的一致性,所以要向各个参与者发送事务回滚通知,具体步骤如下:

  1. 协调者向各个参与者发送事务 rollback 通知,请求回滚事务;
  2. 参与者收到事务回滚通知之后执行 rollback 操作,然后释放占有的资源;
  3. 参与者向协调者返回事务 rollback 结果信息。

2pc-failed

两阶段提交协议原理简单、易于实现,但是缺点也是显而易见的,包含如下:

  • 单点问题: 协调者在整个两阶段提交过程中扮演着举足轻重的作用,一旦协调者所在服务器宕机,就会影响整个数据库集群的正常运行。比如在第二阶段中,如果协调者因为故障不能正常发送事务提交或回滚通知,那么参与者们将一直处于阻塞状态,整个数据库集群将无法提供服务。 极端情况下, 协调者发出Commmit消息之后宕机, 整体系统将存储不确定的状态.
  • 同步阻塞: 两阶段提交执行过程中,所有的参与者都需要听从协调者的统一调度,期间处于阻塞状态而不能从事其他操作,这样效率极其低下。
  • 数据不一致性: 两阶段提交协议虽然是分布式数据强一致性所设计,但仍然存在数据不一致性的可能性。比如在第二阶段中,假设协调者发出了事务 commit 通知,但是因为网络问题该通知仅被一部分参与者所收到并执行了commit 操作,其余的参与者则因为没有收到通知一直处于阻塞状态,这时候就产生了数据的不一致性。

    针对上述问题可以引入 超时机制互询机制 在很大程度上予以解决。

对于协调者来说如果在指定时间内没有收到所有参与者的应答,则可以自动退出 WAIT 状态,并向所有参与者发送 rollback 通知。对于参与者来说如果位于 READY 状态,但是在指定时间内没有收到协调者的第二阶段通知,则不能武断地执行 rollback 操作,因为协调者可能发送的是 commit 通知,这个时候执行 rollback 就会导致数据不一致。

此时,我们可以介入互询机制,让参与者 A 去询问其他参与者 B 的执行情况。如果 B 执行了 rollback 或 commit 操作,则 A 可以大胆的与 B 执行相同的操作;如果 B 此时还没有到达 READY 状态,则可以推断出协调者发出的肯定是 rollback 通知;如果 B 同样位于 READY 状态,则 A 可以继续询问另外的参与者。只有当所有的参与者都位于 READY 状态时,此时两阶段提交协议无法处理,将陷入长时间的阻塞状态。

上文的介绍摘抄自参考文档

Flink中的两阶段提交

Flink提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。它的第一层类继承关系如下:

img

但是TwoPhaseCommitSinkFunction仍然留了以下四个抽象方法待子类来实现:

1
2
3
4
protected abstract TXN beginTransaction() throws Exception;
protected abstract void preCommit(TXN transaction) throws Exception;
protected abstract void commit(TXN transaction);
protected abstract void abort(TXN transaction);
  • beginTransaction():开始一个事务,返回事务信息的句柄。

  • preCommit():预提交(即提交请求)阶段的逻辑, 在snapshotState()方法被调用

    img

    每当需要做checkpoint时,JobManager就在数据流中打入一个屏障(barrier),作为检查点的界限。屏障随着算子链向下游传递,每到达一个算子都会触发将状态快照写入状态后端(state BackEnd)的动作。当屏障到达Kafka sink后,触发preCommit(实际上是KafkaProducer.flush())方法刷写消息数据,但还未真正提交。接下来还是需要通过检查点来触发提交阶段。

  • commit():正式提交阶段的逻辑, 该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中

    img

    可见,只有在所有检查点都成功完成这个前提下,写入才会成功。这符合前文所述2PC的流程,其中JobManager为协调者,各个算子为参与者(不过只有sink一个参与者会执行提交)。一旦有检查点失败,notifyCheckpointComplete()方法就不会执行。如果重试也不成功的话,最终会调用abort()方法回滚事务。

  • abort():取消事务, 在这里执行Rollback操作

从Flink的实现, 再回看两阶段提交的问题:

  1. 单点问题, 依然存在, Flink只会重启再重试, 但当代码未触发Rollback而异常退出时, Flink实际上对此没有能力

  2. 同步阻塞, Clickhouse并没有写入锁, 因此本身就没有这类问题. 事务超时处理可以由应用层完成

  3. 数据不一致, Flink有Rollback的接口, 再正常情况下能够保证数据最终一致, 但无法保证强一致性. 在单点问题发生时, Flink连最终一致性也无法保证. 而且数据不一致时, 并没有互询机制对账.

Clickhouse两阶段写入

image-20210824191907998

Prepare

跟现在写tmp文件夹一样, 让DP写入的地方写入到一个叫做draft的文件夹. 类似detach目录, 查询时这些数据不可见, 同时也不会触发Merge以及Mutation操作.

Prepare过程会设置两个Label标志, 第一个标志Flink的CP编号, 第二个标志插入数据的标志. Label可以由客户端设置, 也可以让系统自动生成.

Insert语句会包含返回值, 返回的内容为: Host节点名 + Lable_CP名称 + Lable_name名称

Commit

Commit过程类似于Attach, Commit需要提供对应的Lable_name列表, 只会Commit对应Label的DataPart.

另外由于Commit可能会处于Unknown状态, 那么需要能够实现多次commit, 用于第一次失败后的重试.

Rollback

触发回滚时, DataPart可能会处于Draft或者Visible状态, 对于Draft状态的DataPart只需要删除文件即可.

但是对于Visible状态的DataPart, 需要防止数据被Merge, 导致无法回滚, 此时要求

  1. 只能Merge相同Label_CP的DataPart, 不同Label_CP的DP不会merge, Mutation后依然保留Label_CP标志
  2. 超过Flink Checkpoint间隔后, 可以解除Merge限制, 该时间限制由commit命令的配置项传递

这样既能保证Rollup正确执行, 也保证DP个数不会太多, 导致节点负载高.

Draft或者Visible状态只是一种描述, 并非CK内部的DP状态类型.

数据清理

Flink重启后, 不一定能够下发Rollback命令, 因此需要有一个定时的线程来删除Draft的数据.

默认清理1天前的数据, Clickhouse的插入语句, 不允许执行一天.

物化视图

CK的物化视图的更新发生在插入过程中, 因此在插入的时候, 需要对物化视图产生的DP, 打上相同的Label标签.

由于物化视图是分布式写入, 因此每个节点都可能有对应的数据块, 因此插入物化视图时, 返回的数据行数会很多.

DDL变更

Prepare过程中, 按照老的元数据写入数据, 但是commit阶段, 元数据已经改变.

此时Commit DP时, 需要执行Mutation, 这可能导致Commit时间过长, 导致超时, 但如果是异步处理的话, 可能无法完成.

集群扩缩容

集群扩容的时候, 比较简单, 主要按照原有的流程处理即可.

但缩容的过程, 由于节点下线, 导致对于该shard的commit,永远都不会成功, 这样就必然会触发Flink的重试, 而整个集群上所有Flink任务的重试, 对业务来说是无法接受的.

因此需要实现下线时, 流量转发的能力, 将commit的请求, 随着数据的迁移而转发, 服务进程依然会提供服务, 知道flink上依赖的任务全部解除依赖(新的一轮snapshot过程即可解除, 删除对应链接通路即可.)

对比Doris

对于原子插入, Doris处理方式基本一致, 但是Doris的方案能够保证一致性和隔离性, 原因在于Doris有统一的元数据

Doris在查询时, FE会指明那些数据是可见的, 因此Doris的可见性(实际上就是隔离性)可以在FE上通过原子的更新改变.

而CK没有统一的元数据, 因此需要向每个节点发送commit, 单个节点commit后无法了解全局的Commit, 因此一旦出现全局回滚, 就会出现不一致的问题.

Doris分享视频