banner
bladedragon

bladedragon

Spark Job长尾问题排查及小文件优化的思考

但是最近在测试 spark on k8s 的时候,遇到了一些性能问题,于是记录一下排查过程,做一下案例的复盘。

20220202163506_2db51

案例再现#

我们使用的底层集群是 AWS 的 eks 集群,在此之上搭建了一套传统的 hadoop yarn+ spark 环境。通俗来说就是将 yarn 的 resourceManager,nodeManager 等具体的组件转化成 k8s 的 pod,但是上层调度逻辑不变,做了一套两层调度的系统。该系统的具体实现方案和调度逻辑这里按下不表,因为这和今天讲述的性能优化案例关系不大,我们只要了解到这是传统的 yarnspark,但是底层是 k8s 就好。
我采用的任务是来自我们生产环境的一个小任务,期望运行时间 5min,数据量中等,输出表数据大约在 2kw 行。原先生产环境采用的是 AWS 的 EMR on EC2 集群,可以理解是传统的容器主机集群。现在将该任务迁移到 eks 集群后,时长达到了 43min,于是观察其 history ui:

IMG_export_20230415_163937706

IMG_export_20230415_163947995

发现其 sql 执行的阶段耗时与生产环境的耗时基本类似,阶段执行时长只有几分钟,但是整个 spark job 完成耗时却用了 43min,明显在 job 执行尾部存在耗时问题,导致长尾现象发生。

初步分析#

由于采用生产任务一样的 sql,读写数据量也完全一致,排除业务逻辑导致的影响;
排查 driver 日志,发现阶段结束后几乎没有有效日志,此时所有 task 已经执行完成,观察集群,executor 的利用率也非常低。
既然存在耗时,后台一定有耗时的线程在运行,于是查看 spark driver 的 thread dump

找到的真正的原因发现卡在 s3 的 rename 操作上

s3 上 rename 操作带来的性能问题#

首先谈一下 rename。 spark 的 rename 是指在 spark 提交作业过程中,为了保持数据的一致性,会生成临时文件来读写数据,当 task 执行完毕,会将临时文件 rename 为正式文件;当 job 执行完毕,会把该 job 的临时目录下的所有文件 rename 为正式文件
其目录格式大概如图所示

image

driver 会通过 FileFormatWriter 选择合适的 output committer 并启动 writer job,committer 会决定如何提交 job 和 task, 提交流程如下图

image

rename 就发生在其中提交 job 和提交 task 的环节,具体的 rename 策略根据 committer 的策略而定。关于 committer 的细节等会再提。

为什么 s3 上 rename 会有性能问题
AWS 的 s3 包括大多数对象存储,目录本身就是一个对象,因此,其目录 rename 需要经历 list-copy-delete 的操作,相对与文件系统例如 HDFS 简单的 rename,其开销自然非常大。在 spark 运行中,可能会生成非常多的小文件,即使是 HDFS,要进行数万计小文件的 rename,其性能尚且需要优化,更不要说 s3 了。

spark 的文件提交协议#

在谈及如何优化之前,我们先回顾一下与之相关的 Spark 文件提交过程。从上一张图可以看到,Spark 在 job 提交过程中,实际上依然是调用的 Hadoop 的 committer 来采取具体的 commit 的策略。而 committer 要解决的问题,主要有以下几点:

  1. 处理文件失败重读导致的数据一致性问题
  2. 保证 task 推测执行下相同文件多写时的数据正确性
  3. 提高海量文件读写,合并的效率

目前 Hadoo 提供的两种文件提交方式,通过mapreduce.fileoutputcommitter.algorithm.version进行切换

FileOutputCommitter V1#

commit 过程

  1. 首先 TaskAttempt 会将 TaskAttempt Data 写入一个临时目录: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  2. 当 task data 写完,可以提交 task 后,执行 commitTask,将上述目录转移到正式目录:${target_dir}/_temporary/${appAttempt}/${taskAttempt}
  3. 当所有 task 完成,执行 commitJob, 将${target_dir}/_temporary下所有文件及目录转移到${target_dir}正式目录,并在提交完成后当前目录添加标识符 _SUCCESS来表示标识提交成功

数据一致性问题

  1. 在 TaskAttempt 写入的阶段,如果发生 task 写失败需要重试,只需要重写${taskAttempt}目录下/_temporary/下的所有文件就行,可以保留原先正式的 Attempt 目录
  2. 如果发生 application 重试,也可以通过 recoverTask 直接恢复原先${appAttempt}目录下的正式目录文件,直接重命名到当前${appAttempt}目录下
  3. 由于存在两次 rename,V1 实际上是两阶段提交,rename 前后数据的一致性都能得到保证,数据不一致的情况只有可能发生在 rename 的过程中

性能问题
V1 的强一致性带来的负面作用就是两次 rename 的操作在海量文件生成的情景中可能导致耗时问题,尤其是 commitJob 阶段,由于是 Driver 单线程串行执行 commit,此时如果需要 rename 大量文件, 其耗时可能会非常长

FileOutputCommitter V2#

  1. 首先 TaskAttempt 会将 TaskAttempt Data 写入一个临时目录: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}

  2. 当 task data 写完,可以提交 task 后,执行 commitTask,将上述目录转移到 ${target_dir}. 注意这里是直接移动到正式目录

  3. 当所有 task 完成,由于所有 data 已经保存在正式目录下,所以 commitJob 就是单纯添加标识符 _SUCCESS来表示标识提交成功

数据一致性问题
1. 在 taskAttempt 写入的阶段,如果发生 task 写失败重试,由于此时可能 task data 已经被移动到正式目录,因此此时会出现脏数据
2. 如果发生 application 重试,由于之前 application 已提交的数据已经存在在正式目录,因此无需额外的重命名操作,直接继续进行其他数据的重试即可,当然,此时已提交的数据不一定完全正确,其中可能存在脏数据。
3. 可见 V2 过程牺牲一定的数据一致性,选择了最终一致性的方案,由于缺乏中间过程对数据正确性的保证,所以只能通过最后的_SUCCESS 标识符来决定数据是否正确。同时,这还会带来另一个问题,由于存在脏数据,在任务长期运行中,这些脏数据可能无法被正确清理,从而占据存储空间,带来额外的开销

性能问题
V2 之所以采取最终一致性的方案,目录就是减少 V1 rename 操作过多带来的耗时开销。相比 V1,V2 只需要在 task 完成后 rename 到正式目录,而且可以通过 task 线程的并行操作,其执行的时长会被大大降低

小文件优化#

虽然上述的 Committer 的不同算法在一致性和性能上给了大家选择,但毕竟各有利弊。但在实际场景下,大家的选择总是希望 “我全都要”

fm=173&fmt=auto&h=212&img_JPEG=&s=198008D41E4200570CB830AA0300E012&u=1102103420,837649793&w=393

除了在 rename 阶段进行优化外,性能杀手的源头:对海量小文件的优化也成为了一个行之有效的方法。

Spark 现有的优化:#

在 spark 中内置有对小文件的优化,从文件生成角度:

  • spark.files.openCostInBytes 利用这个参数设置预估打开文件的大小,设置高一点可以提高小文件分区的速度

从业务侧考虑,大致思路是减少分区数来将小文件合并成大文件

  • 使用 coalesce 或 repartition 操作合并分区。
  • 减少使用动态分区或者使用 distribute by来控制 reducer 个数
  • 多使用分区表来降低查询时产生的分区数量
  • 或者使用更先进的文件压缩格式来提高小文件处理性能

AWS 的特殊优化:#

由于我们在生产环境中使用了 AWS 的 EMR,也稍微了解了一下 AWS 团队在 s3 上小文件优化上的措施

  1. multi upload:其原理就是利用并发读写文件片段来提高处理 s3 读写的性能,基于此,衍生出 EMRFS S3-optimized Committer 和 s3a Committer(开源),注意,该 committer 默认采用 FileOutputCommitter V2 方式提交,因此 V2 存在的问题在这些 committer 上也都会存在。
  2. 利用 hdfs 加速,在 EMR 中,考虑到文件系统对于 rename 等操作具有更好的性能,那我们不是可以在文件系统上先 rename,再提交到 s3 上?在 EMR 中,就是在文件提交到 s3 前,先上传到类 hdfs 的文件系统上进行 rename 或者文件合并操作后再上传到 s3 上,这样比起纯 s3 读写,在性能上会有明显收益。当然坏处就是单独维护一个文件系统具有较高成本。

其他优化思路:#

我们团队也在小文件合并上进行了优化,其优化思路就是在 Job 执行的最后,新建一个 job 用于合并小文件,通过继承 Spark 的SqlHadoopMapReduceCommitProtocol来实现插件式的扩展
合并的思路是在 commitTask 之后,获取数据的分区信息,然后进行分组合并,最后在 commitJob 的时候直接将合并完的文件转移到正式目录中。其基本思路如下图

application-and-practice-of-spark-small-file-mergi1

这样合并小文件的优点

  • 该功能是插拔式的,对原生代码的侵入性较低

  • 在海量小文件场景下优势明显

    缺点

    • 新起一个 job 进行优化,在任务最后会新增两个阶段用于小文件合并,会引入更多的 task,带来一定的耗时

尾声#

通过启动该功能,我重新跑一遍任务,最终耗时下降明显降低:

IMG_export_20230416_174701801

当然优化并未完全结束,在 eks 上的任务耗时总体还是要比原 EMR 任务高,但这块问题的深入排查,等待下次有时间再分享吧

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。