SparkSQL: 小文件的处理方式

自适应执行

社区在Spark2.3版本之后的AdaptiveExecute特性之中就能很好的解决Partition个数过多导致小文件过多的问题.
通过动态的评估Shuffle输入的个数(通过设置spark.sql.adaptive.shuffle.targetPostShuffleInputSize实现), 可以聚合多个Task任务, 减少Reduce的个数
使用方式:

1
2
set spark.sql.adaptive.enabled=true
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128MB

优点:

  • 自动根据任务的数据量进行聚合

缺点:

  • 必须存在Shuffle过程, 否则不生效
  • 任务的Shuffle输出比不能太低

Shuffle输出比, 为一个Shuffle任务中最后Output的数据量除以ShuffleRead的数据量的数值. 如果ShuffleRead为100GB, 而输出为1GB, 那么Shuffle输出比为1%. 如果这值比较低, 说明Task之中有很高强度的Filter功能. 这个数值太低会对系统产生比较大影响, 例如每个Shuffle块为128MB, 如果输出比为10%, 那么最后在HDFS之中只有12.8MB, 就如会出现小文件问题. 因此动态执行功能并不会对此产生太大的效果. 现实中, 由于SparkSQL已经有比较高效的FilterPushDown功能, 因此这个比例不太太高, 在在20%以上.

HINT方式

社区在Spark2.4版本之后引入HINT模式SPARK-24940, 可以由用户来指定最后分区的个数, 只要在SQL语句之中加入注释文件
使用方式:

1
2
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...

优点:

  • 支持简单无Shuffle模式的Reparation

缺点:

  • 需要人工干预, 设计Partition的个数, 而对于变化的场景来说, 难有一个固定的Partition个数
  • 无法处理Shuffle输出比过低的场景

独立的小文件合并

综上所诉两个方案都无法处理Shuffle输出比过低的场景, 因此我们需要一种兜底方案: 直接读取HDFS上的数据, 进行合并操作.
当插入任务完成之后, 新启动一个Job读取所有的数据, 然后根据设置的文件大小, 进行合并并会写到HDFS之中.
由于是直读直写的方式, 因此对于数据大小的评估是非常精确的, 因此可以很好的避免Shuffle输出比的问题.

优点:

  • 基本解决了小文件问题

缺点:

  • 引入新的一次Job过程, 性能会受影响, 特别对中型任务会有一定的影响(10秒左右)

使用方式:

1
2
set spark.sql.merge.enabled=true;
set spark.sql.merge.size.per.task=134217728; --128 * 1024 * 1024 bytes

性能优化:
ORC和Parquet格式支持按行读取和按Stripe读取, Stripe读取可以认为是GroupRead, 由于不需要解析文件里面具体的数值, 因此可以按照Stripe粒度读取文件, 再写入文件之中, 以Stripe粒度合并文件.
1
set spark.sql.merge.mode=fast; -- 默认是pretty, 是逐行读写文件的, 性能较慢

实际上说, 这种方式与启动独立合并的任务, 后台不停的合并是一样的, 只不过将这种插入到每个SQL任务中, 并自动完成了