SQLFlow深度解析

SQLFLow介绍

SQLFlow是阿里巴巴蚂蚁金服开源的一个AI On SQL的项目, 目标是SQL 引擎和 AI 引擎连接起来,让用户仅需几行 SQL 代码就能描述整个应用或者产品背后的数据流和 AI 构造

SQLFlow 最早的初衷,就是希望解决分析师既要操作数据又要使用 AI、往往需要在两个甚至更多的系统之间切换、工作效率低的窘境。

目前业界已有的AI ON SQL的方案:

  • Microsoft SQL Server:Microsoft SQL Server 支持机器学习服务,可以将 R 或 Python 编写的机器学习程序作为外部脚本运行.缺点: 需要编写R或者Python程序代码
  • Teradata SQL for DL:Teradata 也提供了 RESTful 服务,可以通过扩展的 SQL SELECT 语法调用. 缺点: 语法耦合了它的Rest服务
  • Google BigQuery:Google BigQuery 通过引入 CREATE MODEL 语句让用 SQL 实现机器学习成为可能. 缺点: 支持的模型有点少, 深度学习还不支持.

针对以上三个方案的缺点, SQLFlow的定义了自己的三个设计目标:

  • 这一解决方案应与许多 SQL 引擎都兼容,而不是只能兼容特定版本或类型的 SQL 引擎。
  • 它应该支持复杂的机器学习模型,包括用于深度学习的 TensorFlow 和用于树模型的 XGBoost。
  • 能够灵活地配置和运行前沿机器学习算法,包括指定特征交叉,无需在 SQL 语句中嵌入 Python 或 R 代码,以及完全集成超参数估计等。

目前阶段来说, SQLFlow已经支持MySQL/MaxCompute/Hive, 机器学习框架已经支持TF和XGBoost.

资源:

宣传文章

Github官网

官方文档

Spark社区既有MLib这样的机器学习框架, 也有一些基于深度学习的扩展, 例如Uber的horovod还有Yahoo的TensorFlowOnSpark, 但是这些框架都是基于的是Spark DataSet的接口联通的, 你可以在DataSet API上使用SQL, 也可以使用AI接口, 你可以认为是AI + SQL的模式, 而不是AI ON SQL的模式

SQLFlow试用

社区提供了一个官方的试用的Docker镜像, 只要键入以下命令启动容器即可:

1
docker run -d -p 8888:8888 sqlflow/sqlflow:latest

试用浏览器打开容器机器所在的8888端口, 可以看到一个notebook的页面, 默认已经有一个example.ipynb的样例文件了

打开这个example.ipynb文件, 这个镜像里面已经安装了mysql, 同时也把部分测试数据导入到数据库里面, 你不需要做任何数据处理的工作, 就可以直接运行Notebook里面的Cell. 蚂蚁的开发人员真的很贴心啊

执行推理过程, 使用DNNClassifier算法模型, 并将结果写入到sqlflow_models.my_dnn_model表中, my_dnn_model只有两个字段: ID和BLOB(存放模型序列化后的字节流)

执行对于测试集(iris.test)进行推理, 并写入到预测结果表之中(iris.predict.class)

最后展示推理结果集iris.predict

架构解析

这个AI ON SQL系统里面, 首先要回到的一个问题是, AI系统的计算层和SQL系统的计算层是什么关系?

例如Spark(BigQuery大概率也是, 但是因为闭源不能确定)AI引擎代码是内嵌于SQL计算系统之中的, 并行执行的能力由Spark管理, AI系统就像代码库一样被Spark系统所调用而已.

但SQLFlow明显不是这种模式的:

从SQLFlow的架构图上看的出来, AI Engine和SQL Engine之间是独立的, 两者通过RPC交互数据和模型

  1. AI Engine训练或者推理计算的时候, 从SQLEngine获取数据
  2. AI Engine完成训练过程, 将模型写入到SQL Engine; 推理过程从SQL Engine读取模型

整个SQLFlow的流程大致如下(图上红色部分为SQLFlow):

  1. Notebook输入SQL之后, 送入到Parser之内, 这儿的语法解析借用了Hive/MaxCompute等引擎
  2. 解析完SQL语法后, 进行Schema Verification
  3. 然后根据SQL语法, 产生对应的Code(根据不同模型和不同引擎产生不同的Code)
  4. 最后执行Code

设计文档

源码解析

最后我们来简单过一遍SQLFlow的代码, 提炼一下找代码框架的思路:

SQLFlow代码量目测在1-1.5万行左右, 半天就能看懂基础流程了

找到入口

首先, 找到整个项目的Dockerfile, 就在根目录下

如果项目有Docker镜像, Dockerfile就是个个进程的入口, 比main函数还在前面

1
2
ADD scripts/start.sh /
CMD ["bash", "/start.sh"]

看到最后一行启动的命令为start.sh, 源码文件在scripts/start.sh

找到启动文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function print_usage() {
echo "Usage: /bin/bash start.sh [OPTION]\n"
echo "\tpopulate-example-dataset-mysql: populate an existing mysql instance with the example dataset."
echo "\tmysql: setup the mysql server with the example dataset initialized."
echo "\tsqlflow_server: setup the sqlflow gRPC server."
echo "\tsqlflow_notebook: setup the Jupyter Notebook server."
echo "\tall(default): setup a MySQL server instance, a sqlflow gRPC server and a Jupyter Notebook server sequentially."
}

function main() {
ARG=${1:-all}
case $ARG in
all)
echo "setup all-in-one"
setup_mysql
setup_sqlflow_server &
setup_sqlflow_notebook
;;
*)
print_usage
;;
esac
}

可以看到启动文件启动了三个内容:

  1. mysql: 默认的SQL Engine, 已经初始化了数据

  2. sqlflow_server: 我们要找的程序, 找到里面真正的执行命令sqlflowserver --datasource=${DS}

  3. sqlflow_notebook: Notebook交互式界面

找到main函数入口

全局搜索function main(), 发现只有cmd/sqlflowserver/main.go有.

start函数里面找到关键的proto.RegisterSQLFlowServer(s, server.NewServer(sql.Run, nil, modelDir, enableSession))服务启动代码, 其中的sql.Run就整个SQL处理代码.

不太熟悉go语言是怎么出包的

SQL.Run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Run(slct string, db *DB, modelDir string, session *pb.Session) *PipeReader {
splittedSQL, err := splitExtendedSQL(slct)
if err != nil {
rd, wr := Pipe()
// return the lexer error message to client side
go func() {
defer wr.Close()
wr.Write(err)
}()
return rd
}
if len(splittedSQL) == 2 {
return runExtendedSQL(slct, db, modelDir, session)
}
return runStandardSQL(slct, db)
}

如果是SQL语句, 走入到runStandardSQL分支, 如果有训练或者推理语法, 就会走入runExtendedSQL分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func runExtendedSQL(slct string, db *DB, modelDir string, session *pb.Session) *PipeReader {
rd, wr := Pipe()
go func() {
defer wr.Close()

err := func() error {
defer func(startAt time.Time) {
log.Debugf("runExtendedSQL %v finished, elapsed:%v", slct, time.Since(startAt))
}(time.Now())
pr, e := newParser().Parse(slct) // 语言解析
if e != nil {
return e
}
cwd, e := ioutil.TempDir("/tmp", "sqlflow")
if e != nil {
return e
}
defer os.RemoveAll(cwd)

if pr.train {
ds, e := newTrainAndValDataset(db, pr.standardSelect.String(), pr.standardSelect.tables[0], 0.8)
if e != nil {
return e
}

return train(wr, pr, db, cwd, modelDir, slct, ds) // 调用训练
}
return pred(wr, pr, db, cwd, modelDir) // 调用推理部分
}()

if err != nil {
log.Errorf("runExtendedSQL error:%v", err)
if err != ErrClosedPipe {
if err := wr.Write(err); err != nil {
log.Errorf("runExtendedSQL error(piping):%v", err)
}
}
}
}()
return rd
}

可以看到, 先进行语法解析, 然后进行训练或者推理逻辑, 我们简单看一眼训练过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func train(wr *PipeWriter, tr *extendedSelect, db *DB, cwd string, modelDir string, slct string, ds *trainAndValDataset) error {
fts, e := verify(tr, db) // 语法校验
if e != nil {
return e
}
var program bytes.Buffer
if e := genTF(&program, tr, ds, fts, db); e != nil { // 生成代码
return fmt.Errorf("genTF %v", e)
}

cw := &logChanWriter{wr: wr}
defer cw.Close()
cmd := tensorflowCmd(cwd, db.driverName) // 执行命令
cmd.Stdin = &program
cmd.Stdout = cw
cmd.Stderr = cw
if e := cmd.Run(); e != nil {
return fmt.Errorf("training failed %v", e)
}
m := model{workDir: cwd, TrainSelect: slct}
if modelDir != "" {
return m.saveTar(modelDir, tr.save) // 保存模型到本地文件夹
}
return m.save(db, tr.save) // 保存模型到数据库
}

可以看到, 进入训练过程之后, 先做了语法校验, 然后生成的对应的TF的代码, 然后调用tensorflowCmd执行命令, 最后将模型保存完毕, 完成训练过程.

语法校验先略过, 代码生成过程比较复杂后面再介绍, 我们先关注于命令执行过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func tensorflowCmd(cwd, driverName string) (cmd *exec.Cmd) {
if hasPython() && hasTensorFlow() && hasDatabaseConnector(driverName) {
log.Printf("tensorflowCmd: run locally")
cmd = exec.Command("python", "-u")
cmd.Dir = cwd
} else if hasDocker() {
log.Printf("tensorflowCmd: run in Docker container")
const tfImg = "sqlflow/sqlflow"
if !hasDockerImage(tfImg) {
log.Printf("No local Docker image %s. It will take a long time to pull.", tfImg)
}
cmd = exec.Command("docker", "run", "--rm",
fmt.Sprintf("-v%s:/work", cwd),
"-w/work", "--network=host", "-i", tfImg, "python")
} else {
log.Fatalf("No local TensorFlow or Docker. No way to run TensorFlow programs")
}
return cmd
}

tensorflowCmd有两种执行模式: 本地执行容器执行, 目前这两种方式都是单机执行模型, 实际上这儿就印证了AI Engine和SQL Engine分离的架构

未来这儿可以很方便的将这个扩展为分布式任务, 例如Kubeflow的TFJob,这个这块需要跟代码生成那儿一起修改.

代码生成

让我们在回到genTF这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
func genTF(w io.Writer, pr *extendedSelect, ds *trainAndValDataset, fts fieldTypes, db *DB) error {
r, e := newFiller(pr, ds, fts, db) // 应该是按照字段的过滤吧, 没仔细看, 可能会出错
if e != nil {
return e
}
if pr.train {
return tfTrainTemplate.Execute(w, r) // 根据训练模板生成code
}
return tfPredTemplate.Execute(w, r) // 根据推理模板生成code
}

var tfTrainTemplate = template.Must(template.New("codegenTfTrain").Parse(tfTrainTemplateText)) // 训练模板
var tfPredTemplate = template.Must(template.New("codegenTfPred").Parse(tfPredTemplateText)) // 推理模板

这儿实际上最关键的是两个训练模板, 这两个模块在template_tf.go里面定义

除了TF的模板, 还有template_alpstemplate_elasticdl这两个

模板里面就是Python代码了, 截取里面的一部分说明一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import os
# Disable Tensorflow INFO and WARNING logs
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

import sys, json
import tensorflow as tf
import functools
import sqlflow_models # 注意点1

from sqlflow_submitter.db import connect, db_generator # 注意点2

# 忽略一部分代码
def input_fn(datasetStr):
feature_types = []
for name in feature_column_names:
if feature_metas[name]["is_sparse"]:
feature_types.append((tf.int64, tf.int32, tf.int64))
else:
feature_types.append(get_dtype(feature_metas[name]["dtype"]))

gen = db_generator(driver, conn, datasetStr, feature_column_names, "{{.Y.FeatureName}}", feature_metas)
dataset = tf.data.Dataset.from_generator(gen, (tuple(feature_types), tf.{{.Y.Dtype}})) # 注意点3
ds_mapper = functools.partial(_parse_sparse_feature, feature_metas=feature_metas)
return dataset.map(ds_mapper)

注意点1sqlflow_models, 这个是定义在同组织的Model里面, 目前只实现了2个实现dnnclassifierlstmclassifier, 这里这个dnnclassifier就是试用里面DNNClassifier算法模型的定义所在

注意点2sqlflow_submitter这个定义在sql/python/sqlflow_submmiter包目录下, 在这儿你可以执行本地的Python文件, 也可以定义自己的submit将CodeGen的代码当做客户端代码, 给远程的深度学习服务提交自己的学习任务. 同时这儿也定义了与SQL Engine的交互代码逻辑, 就是里面的connectdb_generator

注意点3关注于TensorFlow框架是如何读取从数据库里面的数据的, 使用的接口为tf.data.Dataset.from_generator

至此代码分析已经完成, 主流程已经明确了.

社区动态

蚂蚁对于这个项目的投入还是很大的, 应该由专门的人在投入这个项目, 更新频率还是相当快的

但是贡献者还是比较少的, 应该目前看只有21贡献者, 基本上是蚂蚁金服内部员工.

之前看到他们2019年的路标, 2019年的目标预定是支持各种框架, 例如Calcite支持或者GPU TF支持

总体来说2019年主要在完善功能, 但是后来这个ISSUE关闭了, 不确定19年能否完成这些内容.

总结

SQLFlow目前看还处于原型阶段, 整体支持的能力还非常欠缺: GPU的支持, 模型的定义等功能目前好像都不具备.

就目前整体的设计, 我认为以下三点未来需要加强:

  1. 模型定义接口太复杂了: 1. 实现一个模板;2. 实现一个模型算法;3. 实现一个submitter. 这套逻辑对于工程师可能比较简单(实际上定义地方太多, 也麻烦), 但是对于AI算法的人, 肯定不是用
  2. AI EngineSQL Engine分离带来的性能问题, 这套架构的问题就是AI系统离数据远了一点, 所有数据都是通过SQL Engine计算而来, 而且是通过RPC获取的数据, 比起Spark这种直接在内存中获取数据, 这里会成为正式商用时候的大瓶颈点
  3. 数据分布式化工程量太大, 这其实是问题二的引申版, 未来肯定要实现数据分布化, AI计算分布式化, 这钟模式我还没有想到如何分布式化(回去再好好想想)

另外对于这个项目的商业前景如何, 这个确实存疑的, 也许在阿里内部可能有这部分需求, 但对我们来说却不是.

2018年8月BigQuery出了ML之后, 我们也有计划跟进, 调研了XGBoost On Spark, 但最后还是没决定要做, 最主要的原因是没有客户明确需要这个能力.

值得表扬的是: 蚂蚁的文档和demo做的是真好, 做技术调研能遇到这样的项目, 确实让我省了不少功夫.

附录: Go语言环境安装

SQLFlow的主编程语言为Go语言, 安装部署也相对方便, 不记录过程了, 只放置一些安装资源链接

Go安装包

GoLand下载地址

GoLand破解

IDEA License Server搭建