Clickhouse技术分享: SQL查询流程

SQL流程概览

数据库发展至今, SQL处理流程已经非常完善, 以Presto的分析流程为例

img

  1. SQL文本由客户端提交到服务端时, 首先会进入Parser模块, 进行词法和语法解析
  2. 经过解析后, 会生成AST(abstract syntax code)树, 然后会进入语义分析阶段, 结合元数据信息, 将AST中无意义的文本, 转化为有含义的对象.
  3. 然后根据AST树, 转化为逻辑计划
  4. 逻辑计划经过优化器(optimizer)后, 生成最终执行的执行计划(在Presto是分布式执行计划)
  5. 最后放入执行器执行具体任务

Clickhouse的流程也基本相似:

image-20210816161057679

  1. 客户单通过TCP端口提交任务到Clickhouse Server
  2. TCP Handler会响应请求, 并调用executeQuery函数处理请求
  3. executeQuery会处理上面SQL处理的5个步骤, 最后生成执行计划
    1. Parser为词法解析
    2. Interpreter为语义解析
    3. QueryPlan为逻辑计划
    4. QueryPipeline为经过优化的执行计划
  4. QueryPipeline放入到PipelineExecutor中执行任务

注: 由于Clickhouse并非一个MPP的数据库, 因此并没有分布式执行计划一说, 分布式方式被拆散到QueryPlan之中.

TCPHandler

通过TCP端口(CK默认的客户端)连接的请求会在TCPHandler::runImpl()函数中被处理, 在做完一些准备工作后, 他会执行以下代码

1
2
3
4
5
6
7
8
/// Processing Query
state.io = executeQuery(state.query, query_context, false, state.stage, may_have_embedded_data);
if (state.need_receive_data_for_input) // It implies pipeline execution
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
}

executeQuery的输入state.query是String类型, 实际上就是客户端的SQL语句, 输出state.io输出对象为BlockIO, 这是一个QueryPipeline的封装

最终executeQuery将SQL字符串转化为执行计划.

然后调用pipeline.execute()获得计划执行器, 然后调用execute开始任务的执行.

TCPHandler类的实现挺不错的, 整体的细节都封装在executeQueryPipelineExecutor之中

executeQuery

executeQuery的实现函数为executeQueryImpl

1
2
3
4
5
6
7
ASTPtr ast;
ParserQuery parser(end);
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);
auto interpreter = InterpreterFactory::get(ast, context, SelectQueryOptions(stage).setInternal(internal));
BlockIO res = interpreter->execute();
QueryPipeline & pipeline = res.pipeline;
// 设置一些回调函数, 还有一些querylog

首先, 使用ParserQuery将SQL语句转化为AST树(代码在parseQuery.cpp中)

其次, 使用工厂方法InterpreterFactory::get根据AST树, 创建出对应的Interpreter, 我们关注查询, 对应的是InterpreterSelectQuery

然后, 调用Interpreterexecute方法生成了最终的执行计划

最后, 设置一些回调函数和queryLog等执行后的处理内容.

这里将语法解析步骤和后面的语义分析步骤分开, 导致代码可读性降低了, 一下子很难找到例如优化器部分的代码了

parseQuery

Clickhouse的Parser模块是自己通过代码实现的, 而非通过现有的库实现, 这样语法解析的性能会好很多.

拆解整个parseQuery的调用链: parseQuery -> parseQueryAndMovePosition -> tryParseQuery -> parser.parse

最后还是调用了IParser::parse方法, IParser有一个子类IParserBase实现了parse函数, 但又派生了自己的一个parseImpl方法, 所有的Parser都是IParserBase的子类, 并且实现了parseImpl方法.

executeQuery定义的ParserQuery可以看做所有Parser的代理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserQueryWithOutput query_with_output_p(end);
ParserInsertQuery insert_p(end);
ParserUseQuery use_p;
ParserSetQuery set_p;
// 省略其他的一些ParserQuery

bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
|| use_p.parse(pos, node, expected)
|| set_role_p.parse(pos, node, expected);

return res;
}

可以看到这个类, 将所有的Parser放到自己内, 我们跟着的Select语句的Parser封装在第一个ParserQueryWithOutput

整个路径为ParserQueryWithOutput -> ParserSelectWithUnionQuery ->ParserUnionQueryElement ->ParserSelectQuery

最后追踪到ParserSelectQuery::parseImpl开始解析Select语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
bool ParserSelectQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;

ParserKeyword s_select("SELECT");
ParserKeyword s_all("ALL");
// 省略很多代码
ASTPtr with_expression_list;
ASTPtr select_expression_list;
// 省略很多代码

// 开始解析字符
/// FROM database.table or FROM table or FROM (subquery) or FROM tableFunction(...)
if (s_from.ignore(pos, expected))
{
if (!ParserTablesInSelectQuery().parse(pos, tables, expected))
return false;
}
/// PREWHERE expr
if (s_prewhere.ignore(pos, expected))
{
if (!exp_elem.parse(pos, prewhere_expression, expected))
return false;
}
}

可以注意到解析开始时, 先定义了一个ASTSelectQuery, IAST是抽象语法树的实现, 每个语句几乎都对应着一个语法树.

Parser目的就是通过SQL文本解析, 将AST树构建出来, 赋值里面的具体数值, 例如ASTSelectQuery就需要将这些参数都给赋值了

1
2
3
4
5
6
7
8
9
10
11
class ASTSelectQuery : public IAST
{
bool distinct = false;
bool group_by_with_totals = false;
bool group_by_with_rollup = false;
bool group_by_with_cube = false;
bool group_by_with_constant_keys = false;
bool limit_with_ties = false;
ASTs children;
std::unordered_map<Expression, size_t> positions;
}

另外, 在ASTSelectQuery也有很多其他分支AST树, 例如在parseImpl提到的with_expression_listselect_expression_list

至此整个抽象语法树已经构建完毕了

InterpreterSelectQuery

这个类中最重要的是构造函数execute方法, 构造函数做的是Analyzer的工作, 就是语义分析的过程, 而execute函数做的构建逻辑计划的步骤.

InterpreterSelectQuery::InterpreterSelectQuery()

构造函数中有一个非常长的Lambda函数: analyze, 截取其中重要的方法:

1
2
3
4
5
6
syntax_analyzer_result = TreeRewriter(context).analyzeSelect();
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>();
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals(), storage->getStorageID());
result_header = getSampleBlockImpl();
// getSampleBlockImpl函数中
query_analyzer = std::make_unique<SelectQueryExpressionAnalyzer>()

首先, 通过TreeRewriter将做一些语法树的转化动作

其次, 生成query_analyzer

最后, 明确一下整个SQL的source_headerresult_header, 也就是需要读取表的字段和输出的字段

这里有三个对象非常关键, 看一下这三个的描述

1
2
3
4
5
TreeRewriterResultPtr syntax_analyzer_result;
std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer;

/// Is calculated in getSampleBlock. Is used later in readImpl.
ExpressionAnalysisResult analysis_result;

TreeRewriter是经过语义分析的语法树, 在这里除了完成元数据转义之外, 还有不少的优化器, 这个未来再说.

SelectQueryExpressionAnalyzerExpressionAnalysisResult是一对, 前者负责把AST树上的表达式拆解掉, 表示为ActionDag, 后者为分析的结果.

至于为啥一个执行动作和执行结果都要暴露出来给InterpreterSelectQuery使用呢, 我感觉是代码写的不够好, 导致这两个对象在使用的时候, 调用比较混乱.

这部分代码逻辑太复杂了, 后续得专门分析一下

语义分析阶段实际上就是需要根据元数据信息, 对文本做语义替换, 并完成校验工作, 查询列存不存在的校验工作.

InterpreterSelectQuery::execute()

1
2
3
4
5
6
7
8
9
10
11
BlockIO InterpreterSelectQuery::execute()
{
BlockIO res;
QueryPlan query_plan;

buildQueryPlan(query_plan);

res.pipeline = std::move(*query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context)));
return res;
}

首先, 构造QueryPlan, 具体的步骤就是根据AST树, 然后往QueryPlan一直addStep

IQueryPlanStep算逻辑计划的封装, QueryPlan维系的就是Step的列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class QueryPlan
{
public:
struct Node
{
QueryPlanStepPtr step;
std::vector<Node *> children = {};
};

using Nodes = std::list<Node>;

private:
Nodes nodes;
Node * root = nullptr;
};

IQueryPlanStep有5个基础子类:

  • CreatingSetsStep: Creates sets for subqueries and JOIN.
  • ISourceStep: Returns single logical DataStream
  • ITransformingStep: 数据做转化的算子, 大多数的算子都在这里
  • JoinStep: 处理Jion算子
  • UnionStep: 处理Union算子

image-20210818111008868

然后, 有了QueryPlan之后构造执行计划QueryPipeline, 具体代码在QueryPlan::buildQueryPipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
checkInitialized();
optimize(optimization_settings);

struct Frame
{
Node * node = {};
QueryPipelines pipelines = {};
};

QueryPipelinePtr last_pipeline;

std::stack<Frame> stack;
stack.push(Frame{.node = root});

while (!stack.empty())
{
auto & frame = stack.top();

if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr; //-V1048
}

size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
// 逻辑计划, 转化为执行计划
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);

if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);

stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}

for (auto & context : interpreter_context)
last_pipeline->addInterpreterContext(std::move(context));

return last_pipeline;
}

整段代码有两个核心

第一, 调用了optimize(optimization_settings)方法, 对逻辑计划进行了优化

第二, 遍历整个逻辑树, 分别调用step->updatePipeline方法, 将逻辑计划转化为执行计划

优化器部分后续再展开, 这里优先讲一下执行计划.

就像逻辑计划, QueryPlan表示逻辑树, IQueryPlanStep表示其中的节点

执行计划的整课树为QueryPipeline, 其中的节点便是IProcessor

IProcessor有很多的实现, 主要分为3类:

  1. 各种Transform结尾的执行算子, 这些算子与逻辑计划中的ITransformingStep对应

    image-20210818112558755

  2. 输入相关的, 包含在ISource中, 跟MergeTree读取相关的Source藏匿于SourceWithProcess

    image-20210818112726000

  3. 输出相关的, 包含在ISinkIOutputFormat

IProcessor除了定义了处理逻辑, 还需要定位输入输出, 在代码里输入输出, 用Port来表征, IProcessor可以多个Input也可以有多个output, 因此两者都用列表表示IProcessor

1
2
3
4
5
6
class IProcessor
{
protected:
InputPorts inputs;
OutputPorts outputs;
}

Port定义为如下, 关联IProcessor, 相当于IProcessor关于上下游数据的连接器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class Port
{
friend void connect(OutputPort &, InputPort &);
IProcessor * processor = nullptr;
}

void connect(OutputPort & output, InputPort & input)
{
if (input.state || output.state)
throw Exception("Port is already connected", ErrorCodes::LOGICAL_ERROR);

auto out_name = output.getProcessor().getName();
auto in_name = input.getProcessor().getName();

assertCompatibleHeader(output.getHeader(), input.getHeader(), " function connect between " + out_name + " and " + in_name);

input.output_port = &output;
output.input_port = &input;
input.state = std::make_shared<Port::State>();
output.state = input.state;
}

其中的connect 将上下游的IProcessor连接在一起

Port有两个实现类: InputPortOutputPort表示IProcessor的输入和输出, 并相互关联.

1
2
3
4
5
6
7
8
9
10
class OutputPort : public Port
{
private:
InputPort * input_port = nullptr;
}
class InputPort : public Port
{
private:
OutputPort * output_port = nullptr;
}

PipelineExecutor

构造PipelineExecutor

PipelineExecutor的构造函数几乎只需要IProcessor列表, 参考QueryPipeline::execute()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PipelineExecutorPtr QueryPipeline::execute()
{
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);

return std::make_shared<PipelineExecutor>(pipe.processors, process_list_element);
}

// PipelineExecutor的构造函数
try
{
graph = std::make_unique<ExecutingGraph>(processors);
if (process_list_element)
process_list_element->addPipelineExecutor(this);
}
catch (Exception & exception)
{
throw;
}

构造拿到列表, 根据执行计划构造ExecutingGraph, 对应还有ExecutingGraph::NodeExecutingGraph::Edge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
struct Node
{
IProcessor * processor = nullptr;
uint64_t processors_id = 0;

Edges direct_edges;
Edges back_edges;

Node(IProcessor * processor_, uint64_t processor_id)
: processor(processor_), processors_id(processor_id)
{
}
};
struct Edge
{
Edge(uint64_t to_, bool backward_,
uint64_t input_port_number_, uint64_t output_port_number_,
std::vector<void *> * update_list)
: to(to_), backward(backward_)
, input_port_number(input_port_number_), output_port_number(output_port_number_)
{
update_info.update_list = update_list;
update_info.id = this;
}

uint64_t to = std::numeric_limits<uint64_t>::max();
bool backward;
uint64_t input_port_number;
uint64_t output_port_number;
Port::UpdateInfo update_info;
};

可以看到Node定义着和IProcessor的关系, 而Edge定义着Port的编号

PipelineExecutor执行

调用链为execute -> executeImpl -> initializeExecution -> executeSingleThread -> executeStepImpl

其中initializeExecution主要是为了初始化ThreadLocal的线程池, 主要的触发逻辑都在executeStepImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// executeStepImpl的核心逻辑
while (node && !yield)
{
if (finished)
break;
addJob(node);
{
node->job();
}
if (node->exception)
cancel();
}

// addJob逻辑
void PipelineExecutor::addJob(ExecutingGraph::Node * execution_state)
{
auto job = [execution_state]()
{
try
{
// Stopwatch watch;
executeJob(execution_state->processor);
// execution_state->execution_time_ns += watch.elapsed();

++execution_state->num_executed_jobs;
}
catch (...)
{
execution_state->exception = std::current_exception();
}
};

execution_state->job = std::move(job);
}

// executeJob逻辑, 实际上就是调用processor->work()
static void executeJob(IProcessor * processor)
{
try
{
processor->work();
}
catch (Exception & exception)
{
if (checkCanAddAdditionalInfoToException(exception))
exception.addMessage("While executing " + processor->getName());
throw;
}
}

总结

整体Clickhouse处理SQL的逻辑就如上面所写的, 整体上实现和SparkSQL挺不一样的, 感觉上SparkSQL相对来说更好理解.

例如你想找一下类型校验的代码, 一般这个代码会写在Analyzer上, 但我粗粗的浏览代码的时候, 就一下子就难以找到. 但是在SparkSQL里面, 你只要搜索TypeCoercion就能找到所有相关的判断规则, 而Clickhouse中到现在依然没有找到.

代码基于21年8月份的master分支