SQL流程概览 数据库发展至今, SQL处理流程已经非常完善, 以Presto的分析流程为例
SQL文本由客户端提交到服务端时, 首先会进入Parser模块, 进行词法和语法解析
经过解析后, 会生成AST(abstract syntax code)树, 然后会进入语义分析
阶段, 结合元数据信息, 将AST中无意义的文本, 转化为有含义的对象.
然后根据AST树, 转化为逻辑计划
逻辑计划经过优化器
(optimizer)后, 生成最终执行的执行计划(在Presto是分布式执行计划)
最后放入执行器
执行具体任务
Clickhouse的流程也基本相似:
客户单通过TCP端口提交任务到Clickhouse Server
TCP Handler
会响应请求, 并调用executeQuery
函数处理请求
executeQuery
会处理上面SQL处理的5个步骤, 最后生成执行计划
Parser为词法解析
Interpreter为语义解析
QueryPlan为逻辑计划
QueryPipeline为经过优化的执行计划
将QueryPipeline
放入到PipelineExecutor
中执行任务
注: 由于Clickhouse并非一个MPP的数据库, 因此并没有分布式执行计划一说, 分布式方式被拆散到QueryPlan之中.
TCPHandler 通过TCP端口(CK默认的客户端)连接的请求会在TCPHandler::runImpl()
函数中被处理, 在做完一些准备工作后, 他会执行以下代码
1 2 3 4 5 6 7 8 state.io = executeQuery (state.query, query_context, false , state.stage, may_have_embedded_data); if (state.need_receive_data_for_input) { auto executor = state.io.pipeline.execute (); executor->execute (state.io.pipeline.getNumThreads ()); }
executeQuery
的输入state.query
是String类型, 实际上就是客户端的SQL语句, 输出state.io
输出对象为BlockIO
, 这是一个QueryPipeline
的封装
最终executeQuery
将SQL字符串转化为执行计划.
然后调用pipeline.execute()
获得计划执行器, 然后调用execute
开始任务的执行.
TCPHandler
类的实现挺不错的, 整体的细节都封装在executeQuery
和PipelineExecutor
之中
executeQuery executeQuery
的实现函数为executeQueryImpl
1 2 3 4 5 6 7 ASTPtr ast; ParserQuery parser (end) ;ast = parseQuery (parser, begin, end, "" , max_query_size, settings.max_parser_depth); auto interpreter = InterpreterFactory::get (ast, context, SelectQueryOptions (stage).setInternal (internal));BlockIO res = interpreter->execute (); QueryPipeline & pipeline = res.pipeline;
首先, 使用ParserQuery
将SQL语句转化为AST树(代码在parseQuery.cpp
中)
其次, 使用工厂方法InterpreterFactory::get
根据AST树, 创建出对应的Interpreter
, 我们关注查询, 对应的是InterpreterSelectQuery
然后, 调用Interpreter
的execute
方法生成了最终的执行计划
最后, 设置一些回调函数和queryLog等执行后的处理内容.
这里将语法解析步骤和后面的语义分析步骤分开, 导致代码可读性降低了, 一下子很难找到例如优化器部分的代码了
parseQuery Clickhouse的Parser模块是自己通过代码实现的, 而非通过现有的库实现, 这样语法解析的性能会好很多.
拆解整个parseQuery
的调用链: parseQuery -> parseQueryAndMovePosition -> tryParseQuery -> parser.parse
最后还是调用了IParser::parse
方法, IParser
有一个子类IParserBase
实现了parse
函数, 但又派生了自己的一个parseImpl
方法, 所有的Parser都是IParserBase
的子类, 并且实现了parseImpl
方法.
在executeQuery
定义的ParserQuery
可以看做所有Parser的代理类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 bool ParserQuery::parseImpl (Pos & pos, ASTPtr & node, Expected & expected) { ParserQueryWithOutput query_with_output_p (end) ; ParserInsertQuery insert_p (end) ; ParserUseQuery use_p; ParserSetQuery set_p; bool res = query_with_output_p.parse (pos, node, expected) || insert_p.parse (pos, node, expected) || use_p.parse (pos, node, expected) || set_role_p.parse (pos, node, expected); return res; }
可以看到这个类, 将所有的Parser放到自己内, 我们跟着的Select
语句的Parser封装在第一个ParserQueryWithOutput
整个路径为ParserQueryWithOutput -> ParserSelectWithUnionQuery ->ParserUnionQueryElement ->ParserSelectQuery
最后追踪到ParserSelectQuery::parseImpl
开始解析Select
语句
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 bool ParserSelectQuery::parseImpl (Pos & pos, ASTPtr & node, Expected & expected) { auto select_query = std::make_shared <ASTSelectQuery>(); node = select_query; ParserKeyword s_select ("SELECT" ) ; ParserKeyword s_all ("ALL" ) ; ASTPtr with_expression_list; ASTPtr select_expression_list; if (s_from.ignore (pos, expected)) { if (!ParserTablesInSelectQuery ().parse (pos, tables, expected)) return false ; } if (s_prewhere.ignore (pos, expected)) { if (!exp_elem.parse (pos, prewhere_expression, expected)) return false ; } }
可以注意到解析开始时, 先定义了一个ASTSelectQuery
, IAST
是抽象语法树的实现, 每个语句几乎都对应着一个语法树.
Parser目的就是通过SQL文本解析, 将AST树构建出来, 赋值里面的具体数值, 例如ASTSelectQuery
就需要将这些参数都给赋值了
1 2 3 4 5 6 7 8 9 10 11 class ASTSelectQuery : public IAST{ bool distinct = false ; bool group_by_with_totals = false ; bool group_by_with_rollup = false ; bool group_by_with_cube = false ; bool group_by_with_constant_keys = false ; bool limit_with_ties = false ; ASTs children; std::unordered_map<Expression, size_t > positions; }
另外, 在ASTSelectQuery
也有很多其他分支AST树, 例如在parseImpl
提到的with_expression_list
和select_expression_list
至此整个抽象语法树已经构建完毕了
InterpreterSelectQuery 这个类中最重要的是构造函数
和execute
方法, 构造函数做的是Analyzer的工作, 就是语义分析
的过程, 而execute
函数做的构建逻辑计划的步骤.
InterpreterSelectQuery::InterpreterSelectQuery() 构造函数中有一个非常长的Lambda函数: analyze
, 截取其中重要的方法:
1 2 3 4 5 6 syntax_analyzer_result = TreeRewriter (context).analyzeSelect (); query_analyzer = std::make_unique <SelectQueryExpressionAnalyzer>(); source_header = metadata_snapshot->getSampleBlockForColumns (required_columns, storage->getVirtuals (), storage->getStorageID ()); result_header = getSampleBlockImpl (); query_analyzer = std::make_unique <SelectQueryExpressionAnalyzer>()
首先, 通过TreeRewriter
将做一些语法树的转化动作
其次, 生成query_analyzer
最后, 明确一下整个SQL的source_header
和result_header
, 也就是需要读取表的字段和输出的字段
这里有三个对象非常关键, 看一下这三个的描述
1 2 3 4 5 TreeRewriterResultPtr syntax_analyzer_result; std::unique_ptr<SelectQueryExpressionAnalyzer> query_analyzer; ExpressionAnalysisResult analysis_result;
TreeRewriter
是经过语义分析的语法树, 在这里除了完成元数据转义之外, 还有不少的优化器, 这个未来再说.
SelectQueryExpressionAnalyzer
和ExpressionAnalysisResult
是一对, 前者负责把AST树上的表达式拆解掉, 表示为ActionDag
, 后者为分析的结果.
至于为啥一个执行动作和执行结果都要暴露出来给InterpreterSelectQuery
使用呢, 我感觉是代码写的不够好, 导致这两个对象在使用的时候, 调用比较混乱.
这部分代码逻辑太复杂了, 后续得专门分析一下
在语义分析
阶段实际上就是需要根据元数据信息, 对文本做语义替换, 并完成校验工作, 查询列存不存在的校验工作.
InterpreterSelectQuery::execute() 1 2 3 4 5 6 7 8 9 10 11 BlockIO InterpreterSelectQuery::execute () { BlockIO res; QueryPlan query_plan; buildQueryPlan (query_plan); res.pipeline = std::move (*query_plan.buildQueryPipeline ( QueryPlanOptimizationSettings::fromContext (context), BuildQueryPipelineSettings::fromContext (context))); return res; }
首先, 构造QueryPlan
, 具体的步骤就是根据AST树, 然后往QueryPlan
一直addStep
IQueryPlanStep
算逻辑计划的封装, QueryPlan
维系的就是Step
的列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class QueryPlan { public : struct Node { QueryPlanStepPtr step; std::vector<Node *> children = {}; }; using Nodes = std::list<Node>; private : Nodes nodes; Node * root = nullptr ; };
IQueryPlanStep
有5个基础子类:
CreatingSetsStep: Creates sets for subqueries and JOIN.
ISourceStep: Returns single logical DataStream
ITransformingStep: 数据做转化的算子, 大多数的算子都在这里
JoinStep: 处理Jion算子
UnionStep: 处理Union算子
然后, 有了QueryPlan
之后构造执行计划QueryPipeline
, 具体代码在QueryPlan::buildQueryPipeline
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 QueryPipelinePtr QueryPlan::buildQueryPipeline () { checkInitialized (); optimize (optimization_settings); struct Frame { Node * node = {}; QueryPipelines pipelines = {}; }; QueryPipelinePtr last_pipeline; std::stack<Frame> stack; stack.push (Frame{.node = root}); while (!stack.empty ()) { auto & frame = stack.top (); if (last_pipeline) { frame.pipelines.emplace_back (std::move (last_pipeline)); last_pipeline = nullptr ; } size_t next_child = frame.pipelines.size (); if (next_child == frame.node->children.size ()) { bool limit_max_threads = frame.pipelines.empty (); last_pipeline = frame.node->step->updatePipeline (std::move (frame.pipelines), build_pipeline_settings); if (limit_max_threads && max_threads) last_pipeline->limitMaxThreads (max_threads); stack.pop (); } else stack.push (Frame{.node = frame.node->children[next_child]}); } for (auto & context : interpreter_context) last_pipeline->addInterpreterContext (std::move (context)); return last_pipeline; }
整段代码有两个核心
第一, 调用了optimize(optimization_settings)
方法, 对逻辑计划进行了优化
第二, 遍历整个逻辑树, 分别调用step->updatePipeline
方法, 将逻辑计划转化为执行计划
优化器部分后续再展开, 这里优先讲一下执行计划.
就像逻辑计划, QueryPlan
表示逻辑树, IQueryPlanStep
表示其中的节点
执行计划的整课树为QueryPipeline
, 其中的节点便是IProcessor
IProcessor
有很多的实现, 主要分为3类:
各种Transform
结尾的执行算子, 这些算子与逻辑计划中的ITransformingStep
对应
输入相关的, 包含在ISource
中, 跟MergeTree读取相关的Source藏匿于SourceWithProcess
中
输出相关的, 包含在ISink
和IOutputFormat
中
IProcessor
除了定义了处理逻辑, 还需要定位输入输出, 在代码里输入输出, 用Port
来表征, IProcessor
可以多个Input也可以有多个output, 因此两者都用列表表示IProcessor
1 2 3 4 5 6 class IProcessor { protected : InputPorts inputs; OutputPorts outputs; }
Port
定义为如下, 关联IProcessor
, 相当于IProcessor
关于上下游数据的连接器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class Port { friend void connect (OutputPort &, InputPort &) ; IProcessor * processor = nullptr ; } void connect (OutputPort & output, InputPort & input) { if (input.state || output.state) throw Exception ("Port is already connected" , ErrorCodes::LOGICAL_ERROR); auto out_name = output.getProcessor ().getName (); auto in_name = input.getProcessor ().getName (); assertCompatibleHeader (output.getHeader (), input.getHeader (), " function connect between " + out_name + " and " + in_name); input.output_port = &output; output.input_port = &input; input.state = std::make_shared <Port::State>(); output.state = input.state; }
其中的connect
将上下游的IProcessor
连接在一起
Port
有两个实现类: InputPort
和OutputPort
表示IProcessor
的输入和输出, 并相互关联.
1 2 3 4 5 6 7 8 9 10 class OutputPort : public Port{ private : InputPort * input_port = nullptr ; } class InputPort : public Port{ private : OutputPort * output_port = nullptr ; }
PipelineExecutor 构造PipelineExecutor PipelineExecutor
的构造函数几乎只需要IProcessor
列表, 参考QueryPipeline::execute()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 PipelineExecutorPtr QueryPipeline::execute () { if (!isCompleted ()) throw Exception ("Cannot execute pipeline because it is not completed." , ErrorCodes::LOGICAL_ERROR); return std::make_shared <PipelineExecutor>(pipe.processors, process_list_element); } try { graph = std::make_unique <ExecutingGraph>(processors); if (process_list_element) process_list_element->addPipelineExecutor (this ); } catch (Exception & exception) { throw ; }
构造拿到列表, 根据执行计划构造ExecutingGraph
, 对应还有ExecutingGraph::Node
和ExecutingGraph::Edge
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 struct Node { IProcessor * processor = nullptr ; uint64_t processors_id = 0 ; Edges direct_edges; Edges back_edges; Node (IProcessor * processor_, uint64_t processor_id) : processor (processor_), processors_id (processor_id) { } }; struct Edge { Edge (uint64_t to_, bool backward_, uint64_t input_port_number_, uint64_t output_port_number_, std::vector<void *> * update_list) : to (to_), backward (backward_) , input_port_number (input_port_number_), output_port_number (output_port_number_) { update_info.update_list = update_list; update_info.id = this ; } uint64_t to = std::numeric_limits<uint64_t >::max (); bool backward; uint64_t input_port_number; uint64_t output_port_number; Port::UpdateInfo update_info; };
可以看到Node
定义着和IProcessor
的关系, 而Edge
定义着Port
的编号
PipelineExecutor执行 调用链为execute -> executeImpl -> initializeExecution -> executeSingleThread -> executeStepImpl
其中initializeExecution
主要是为了初始化ThreadLocal的线程池, 主要的触发逻辑都在executeStepImpl
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 while (node && !yield){ if (finished) break ; addJob (node); { node->job (); } if (node->exception) cancel (); } void PipelineExecutor::addJob (ExecutingGraph::Node * execution_state) { auto job = [execution_state]() { try { executeJob (execution_state->processor); ++execution_state->num_executed_jobs; } catch (...) { execution_state->exception = std::current_exception (); } }; execution_state->job = std::move (job); } static void executeJob (IProcessor * processor) { try { processor->work (); } catch (Exception & exception) { if (checkCanAddAdditionalInfoToException (exception)) exception.addMessage ("While executing " + processor->getName ()); throw ; } }
总结 整体Clickhouse处理SQL的逻辑就如上面所写的, 整体上实现和SparkSQL挺不一样的, 感觉上SparkSQL相对来说更好理解.
例如你想找一下类型校验的代码, 一般这个代码会写在Analyzer
上, 但我粗粗的浏览代码的时候, 就一下子就难以找到. 但是在SparkSQL里面, 你只要搜索TypeCoercion
就能找到所有相关的判断规则, 而Clickhouse中到现在依然没有找到.
代码基于21年8月份的master分支