[Kubeflow系列]Pipleline介绍

总览

Pipeline在Kubeflow里面是一个最重要的能力, 因为需要它来串流整个机器学习任务的组件.
因此我们首先介绍一下pipeline的定义: 它是一个工作流平台,能够编译部署机器学习的工作流, 可以定义复杂的数据DAG流程, 并提供可视化的流程展示和结果展示.

核心概念

pipelines实现了一个工作流模型。所谓工作流,或者称之为流水线(pipeline),可以将其当做一个有向无环图(DAG)。其中的每一个节点,在pipelines 的语义下被称作组件(component)。组件在图中作为一个节点,其会处理真正的逻辑,比如预处理,数据清洗,模型训练等等。每一个组件负责的功能不同,但有一个共同点,即组件都是以 Docker 镜像的方式被打包,以容器的方式被运行的。这也是与 kubeflow 社区的 Run ML on Kubernetes 这一愿景相统一的。

实验(experiment)是一个工作空间,在其中可以针对流水线尝试不同的配置。运行(run)是流水线的一次执行,用户在执行的过程中可以看到每一步的输出文件,以及日志。步(step)是组件的一次运行,步与组件的关系就像是运行与流水线的关系一样。步输出工件(step output artifacts)是在组件的一次运行结束后输出的,能被系统的前端理解并渲染可视化的文件。

Kubeflow Pipeline中概念列表:

  1. Pipeline: 定义一组操作的流水线,其中每一步都由component组成。 背后是一个Argo的模板配置
  2. Component: 一个容器操作,可以通过pipeline的sdk 定义。每一个component 可以定义定义输出(output)和产物(artifact), 输出可以通过设置下一步的环境变量,作为下一步的输入, artifact 是组件运行完成后写入一个约定格式文件,在界面上可以被渲染展示。
  3. Graph: 就是上面看到流程图, 一个pipeline可以转化为一个DAG图
  4. Experiment: 可以看做一个工作空间,管理一组运行任务
  5. Run: 真实任务, pipeline真正在K8s上实例化的产物, 会启动多个对应的Pod, 开始真正的计算
  6. Recurring Run: 定时任务, 可以由Run Trigger触发
  7. Step: Component对应实体, 就是上图之中的方框, 应该是一个Step会真实对应一个K8s的Pod

官方案例

官方的XGBoost - Training with Confusion Matrix案例:

流程可视化页面:

提交任务页面:

Artifacts页面

运行起来的时候, 在Artifacts之中看到一些metric信息(这些信息需要在流程之中自定义)

Pipeline的流程及架构

使用Pipeline一般会有以下的步骤:

  1. 使用python代码编写pipeline定义代码
  2. 通过编译脚本编译生成压缩包
  3. 上传压缩文件, 创建pipeline
  4. 使用该pipeline, 创建任务, 并输入指定的参数值
  5. 查看任务完成状态及输出结果

Python代码定义再放在下一章节介绍, 这个章节介绍2-5步骤后台实现的流程.

下面先介绍两个基础知识:

Argo

我们在Pipeline的页面上可以看到一个Source的子页面, 里面的内容是这个流程yaml格式的定义文件:

可以看到叫做argo的组件, 它是一个开源的基于容器的工作流引擎,实现了一个K8S的CRD(用户自定义的资源):

  • 用容器实现工作流的每一个步骤
  • 用DAG的形式描述多个任务之间的关系的依赖
  • 支持机器学习和数据处理中的计算密集型任务

实际上pipeline上传的就是argo的配置文件, 由这个配置文件来定义整个流程任务.

这个配置文件需要pipeline sdk对用户Python代码进行编译产生.

架构

pipeline的架构, 主要由以下模块组成:

  1. 编译模块(SDK), 将Python代码转化为argo配置文件
  2. 页面模块(Web Server), 用于创建job以及监控任务运行结果
  3. 存储模块, 包含历史任务元数据信息, Artifact的数据缓存
  4. 服务模块, 一个由go编写的后端,提供kubernetes ApiServer 风格的Restful API。处理前端以及SDK发起的操作请求。 Pipeline/Experiment 之类的请求会直接存入元数据。和Run 相关的请求除了写入元数据以外还会通过APIServer 同步操作Argo实例。
  5. 控制模块: Argo模块注册在K8s之中, 将argo的配置文件翻译为真正的K8s执行流程

执行流程

  1. 客户编写python代码, 经过编译之后生产argo配置项
  2. 客户调用上传接口(可以是网页调用), 上传argo配置项到Pipeline Service
  3. Pipeline Service将元数据信息存储到Meta Service之中
  4. 客户触发任务执行, Pipeline Service调用请求到K8s API Service
  5. K8s API Service根据资源类型(argo资源), 触发Argo的编排调度框架
  6. 框架完成任务启动之后, 并将部分Artifact写入到Artifact Storage之中
  7. pipeline时刻监控K8s的任务信息, 并在更新任务状态

GATK最佳实践案例

下面以GATK最佳实践为例, 看一下如何实现一个流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#!/usr/bin/env python3

from kfp import dsl


def align_fastq(fastq1, fastq2):
return dsl.ContainerOp(
name='aligin_fastq_with_bwa',
image='swr.cn-north-5.myhuaweicloud.com/kubeflow/hzw-bwa:1.0',
command=['sh', '-c'],
arguments=["/bwa mem -t 4 -R '@RG\\tID:foo\\tLB:bar\\tPL:illumina\\tPU:illumina\\tSM:NA12878' /data/ref/hg19.fa $0 $1 > /data/sample/test.sam;echo '/data/sample/test.sam' > /output.txt", fastq1, fastq2],
file_outputs={
'output': '/output.txt',
},
pvolumes={
'/data': dsl.PipelineVolume(pvc="cce-sfs-notebook", name="notebook-pvc")
}
)


def conver_sam(samfile):
return dsl.ContainerOp(
name='convert sam into bam',
image='swr.cn-north-5.myhuaweicloud.com/kubeflow/samtools:latest',
command=['sh', '-c'],
arguments=['samtools sort -@ 4 -O bam -o /data/sample/test.bam $0; echo "/data/sample/test.bam" > /output.txt', samfile],
file_outputs={
'output': '/output.txt',
},
pvolumes={
'/data': dsl.PipelineVolume(pvc="cce-sfs-notebook", name="notebook-pvc")
}
)


def markdup(bamfile):
return dsl.ContainerOp(
name='mark dup',
image='swr.cn-north-5.myhuaweicloud.com/kubeflow/gatk:latest',
command=['sh', '-c'],
arguments=['gatk MarkDuplicates -I $0 -O /data/sample/test.markup.bam -M /data/sample/test.metrics.txt; echo "/data/sample/test.markup.bam" > /output.txt', bamfile],
file_outputs={
'output': '/output.txt'
},
pvolumes={
'/data': dsl.PipelineVolume(pvc="cce-sfs-notebook", name="notebook-pvc")
}
)

def bqsr(marddup):
return dsl.ContainerOp(
name='bqsr',
image='swr.cn-north-5.myhuaweicloud.com/kubeflow/gatk:latest',
command=['sh', '-c'],
arguments=['gatk BaseRecalibrator -I $0 -O /data/sample/test.table -R /data/ref/hg19.fa.gz --known-sites /data/ref/1000g_omni2.5.hg19.sites.vcf.gz;'
'gatk ApplyBQSR -R /data/ref/hg19.fa.gz -I $0 -O /data/sample/test.bqsr.bam --bqsr-recal-file /data/sample/test.table;'
'echo "/data/sample/test.bqsr.bam" > /output.txt', marddup],
file_outputs={
'data': '/output.txt'
},
pvolumes={
'/data': dsl.PipelineVolume(pvc="cce-sfs-notebook", name="notebook-pvc")
}
)


def haplotype(bqsr):
return dsl.ContainerOp(
name='haplotype',
image='swr.cn-north-5.myhuaweicloud.com/kubeflow/gatk:latest',
command=['sh', '-c'],
arguments=['gatk HaplotypeCaller -I $0 -O /data/sample/test.vcf.gz -R /data/ref/hg19.fa.gz;'
'echo "/data/sample/test.vcf.gz" > /output.txt', bqsr],
file_outputs={
'data': '/output.txt'
},
pvolumes={
'/data': dsl.PipelineVolume(pvc="cce-sfs-notebook", name="notebook-pvc")
}
)


@dsl.pipeline(
name="Gatk Basic pipeline",
description="A Basic pipeline for gatk."
)


def pipeline(fastq1='/data/sample/200M_1_NA12878_clean_1.fastq.gz', fastq2='/data/sample/200M_1_NA12878_clean_2.fastq.gz'):
samfile = align_fastq(fastq1, fastq2)
bamfile = conver_sam(samfile.output)
markfile = markdup(bamfile.output)
bqsr_file = bqsr(markfile.output)
haplotype(bqsr_file.output)


if __name__ == "__main__":
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline, __file__ + ".tar.gz")r
  1. dsl就是pipeline sdk, 由@dsl.pipeline标识pipeline的定义: 下一个函数即为定义函数, 函数的参数列表翻译为argo的input字段
  2. dsl.ContainerOp定义了镜像操作, 在这个例子之中有5个ContainerOp就会对应五个step, 启动5个pod进行计算.
  3. dsl.PipelineVolume定义了磁盘挂载信息, 作为参数输入到ContainerOp之中, 说明将该名字的PVC挂载到镜像里之中, 挂载目录为/data
  4. bamfile = conver_sam(samfile.output)定义了依赖关系, 说明conver_sam依赖align_fastq的输出, 这个例子是个串行任务, pipeline支持并发支持, 实现如下所示bamfile = conver_sam(samfile1.output, samfile2.output)

最后, 执行main函数, 可以生成一个tar.gz包, 将这个包上传到流程页面上, 就能开始运行任务.

Pipeline的SDK简要说明

API文档之中详细写明了SDK目前的功能, 这儿就简单啰嗦一句.

目前SDK只支持如下几个模块的功能:

img

  1. 首先是compiler模块, 主要是将python代码转化为argo的yaml配置项
  2. components模型实现如何导入外部模块, 以及如何构建模块
  3. dsl是最重要的模块, 定义了ContainerOp以及VolumeOp等真正和K8s交互的模块
  4. client的模块主要是如何提交任务的模块
  5. notebook目前还是空的
  6. extension主要是云上的扩展模块, 目前主要支持谷歌亚马逊和微软的云

整个SDK的代码量非常少, 可以直接看源码了解更多的内容.

完结撒花

走马观花一样的看了一下Pipleline的能力, 接下来总结一下优点和缺点:

  1. 整体上pipeline有好的编程入口, 比较适合程序员, 但缺乏页面定制能力, 无法面向非程序员群体
  2. 有简单的任务调度能力, 支持定时任务, 但是功能有限
  3. 整体架构简洁明了, 但微服务众多, 运维是个压力
  4. 整体来说, pipeline功能还不完善, 需要深度定制, 但K8s + AI是趋势, 未来肯定会越来越好, 越来越庞大.