Spark Streaming Indroduce

面向人群

  • 精通Java/Scala编程
  • Spark相关使用及编程经验
  • 了解流应用架构

简明介绍

Spark Streaming是一种基于微批量(micro-batch)方式计算和处理实时流数据执行框架。
Streaming依托于于Spark执行框架,将连续输入数据按批次切分,通过DStream(Discretized stream)来表征,然后按批次组装为Spark任务,放入Spark任务池中执行。

图1 Streaming与Spark关系

因此基于micro-batch的Streaming必然会带有以下特征:

  • 高扩展性
  • 高吞吐量
  • 高可靠性
  • 高延时性

目前Streaming已经内置以下多种数据源和输出源的适配器,其中数据源使用比较多的是Kafka和HDFS,输出源一般都为HDFS。
Streming输入输出

图2 Streming输入输出

Streaming目前使用的案例并不是特别多,SharethroughPinterest是比较明确Streaming的使用者:).

关键概念

micro-batch

图3 micro-batch与流与批的关系

Wiki上并没有关于micro-batch的介绍,甚至在Streaming的paper中也没有提出这个名词,只能从网上别人总结的图来说明。
用这个概念能很好的总结Spark Streaming的执行流程,也能很直观的与类似Storm这类Record By Record类型的流系统区别开来。所谓的micro-batch,它的本质是批处理,因此Streaming的执行层是Spark——一个批处理系统:

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs.

因此,Streaming会天然继承Spark的优点:高扩展性高吞吐量高可靠性
但是Streaming能够处理处理流式数据的原因在与它的batch相对于正常的Spark应用是非常小的,而且是严格按照时间切分批次。
定义:

Streaming将流式数据分为多个固定时间窗口中的单批数据,并根据处理逻辑封装成Spark任务持续不断放入Spark中执行。

JobGenerator

Streaming需要提供按照固定时间产生的持续不断的Spark任务,因此在它的实现里,必然会有一个定时器,该定时器每隔固定时间生成一个处理该时间窗口数据的Spark任务,我们称这个组件为JobGenerator。
由于一些Streaming任务前后数据之间没有关联性,因此在JobGenerator中必须提供一个Spark的任务池,用于多线程的执行Spark的任务。

Receiver

Streaming还需要负责接收固定时间产生的流式数据,并将这种数据封装为JobGenerator产生Spark任务的输入数据,我们称之为Receiver。
由于micro-batch方式,Streaming可以同时处理批数据和流式数据,因此也会存在两种组织形式的Receiver。

  • 流式数据
    以SocketReceiver为代表,通过单节点接受流式数据,将数据按批组装为任务输入数据,包括KafkaReceiver、FlumeReceiver等接收型数据。
  • 批数据
    以HDFS接口为代表,本身底层数据系统即是分布式数据,数据不需要组装,可以直接被RDD表征,在Streaming主要包括HDFS文件以及DirectKafkaAPI。

DStreamGraph

Streaming的API设计与RDD接口相似,RDD通过dependencies_存储自己的处理逻辑,并通过DAGScheduler分解出RDD整个Spark执行的逻辑,而相对应的DStream需要将自己的逻辑翻译为RDD原语,这个翻译过程被称为DStreamGraph

图4 DStream转化RDD

DStream

1
2
3
4
5
6
// Iter
Source.fromFile("").getLines().flatMap(_.split(",")).map(x => (x, 1)).foreach(println)
// RDD
SpoarkContext.textFile("").flatMap(_.split(",")).map(x => (x, 1)).foreach(println)
// DStream
StreamingContext.textFileStream("").flatMap(_.split(",")).map(x => (x, 1)).foreach(println)

从上诉代码上看,DStream和RDD的接口都参考Scala的集合API设计,我们可以将迭代器理解为单机上表征数据以及数据转化方式的对象,那么从RDD的定义和实现来看,RDD是在分布式维度上表征数据及数据转化方式的对象。
RDD定义:

Resilient Distributed Datasets (RDDs) are fault-tolerant, parallel data structures that let users explicitly persist intermediate results in memory, control their partitioning to optimize data placement, and manipulate them using a rich set of operators.

而DStream可以理解为在时间维度上表征分布式数据及数据转化方式的对象:

A discretized stream or D-Stream groups together a series of RDDs and lets the user manipulate them to through various operators. D-Streams provide both stateless operators, such as map, which act independently on each time interval, and stateful operators, such as aggregation over a sliding window, which operate on multiple intervals and may produce intermediate RDDs as state.

当然,DStream除了上述和scala集合操作对应的API,DStream还包括一些流应用特有的操作,例如

实现浅析

流式语义

Streaming的语义在官网已经有所介绍,总结而言:

  • 数据接收阶段,对于批数据已经实现Exactly once语义,对于流式数据在Spark-1.2以后引入WAL技术,可以保证at-least once语义
  • 数据转化阶段,依托RDD的可靠性保证,Streaming能保证Exactly once语义
  • 数据输出阶段,默认的语义只为at-least once,需要用户自己实现Exactly once语义

简单案例

HdfsWordCount

1
2
3
4
5
6
7
8
9
10
11
12
val sparkConf = new SparkConf().setAppName("HdfsWordCount")
// Create the context
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
val lines = ssc.textFileStream(args(0))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()

DirectKafkaWordCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

// Get the lines, split them into words, count the words and print
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
ssc.awaitTermination()

Streaming应用一般有以下步骤:

  1. 根据不同数据源接口获取InputDStream,对于Kafka即KafkaUtils.createDirectStream,对于HDFS即:ssc.textFileStream
  2. 通过DStream接口根据业务对InputDStream进行转化:flatMap(_.split(" ")).map(x => (x, 1L)).reduceByKey(_ + _)
  3. DStream输出结果,printsaveAsTextFiles等,如果需要自定义的输出结果,可以使用foreachRDD算子
  4. 调用ssc.start()ssc.awaitTermination(),启动Streaming的计算

任务提交
使用常规的Spark任务提交应用,例如HdfsWordCount

1
./bin/run-example streaming.HdfsWordCount /streaming

启动任务后,可以在本地上传文本文件到HDFS指定目录下,这时在日志界面上就能看到统计出来的单词条数
1
./bin/hadoop fs -put textFie /streaming

实际案例

公司案例

学习建议

  1. 首先仔细浏览官网上Streaming编程指南,学习Streaming相关概念
  2. 通过官方样例工程熟悉API接口,并编写编写简单的应用,尝试在集群中运行。
  3. 学有余力的可以开始阅读Streaming源代码,并通过对比代码尝试定位运行过程中出现的问题。
  4. 学习Kafka相关概念与架构以及其他流式组件
  5. 学习其他流式组件,集思广益

参考

  1. Spark Paper
  2. Spark Streaming Paper
  3. Streaming Programming Guide
  4. Indroduce By Tathagata
  5. 大规模流式数据处理的新贵
  6. Storm与Spark Streaming比较
  7. Spark Streaming实时计算框架介绍
  8. 从Storm和Spark 学习流式实时分布式计算的设计