但是最近在测试 spark on k8s 的时候,遇到了一些性能问题,于是记录一下排查过程,做一下案例的复盘。
案例再现#
我们使用的底层集群是 AWS 的 eks 集群,在此之上搭建了一套传统的 hadoop yarn+ spark 环境。通俗来说就是将 yarn 的 resourceManager,nodeManager 等具体的组件转化成 k8s 的 pod,但是上层调度逻辑不变,做了一套两层调度的系统。该系统的具体实现方案和调度逻辑这里按下不表,因为这和今天讲述的性能优化案例关系不大,我们只要了解到这是传统的 yarnspark,但是底层是 k8s 就好。
我采用的任务是来自我们生产环境的一个小任务,期望运行时间 5min,数据量中等,输出表数据大约在 2kw 行。原先生产环境采用的是 AWS 的 EMR on EC2 集群,可以理解是传统的容器主机集群。现在将该任务迁移到 eks 集群后,时长达到了 43min,于是观察其 history ui:
发现其 sql 执行的阶段耗时与生产环境的耗时基本类似,阶段执行时长只有几分钟,但是整个 spark job 完成耗时却用了 43min,明显在 job 执行尾部存在耗时问题,导致长尾现象发生。
初步分析#
由于采用生产任务一样的 sql,读写数据量也完全一致,排除业务逻辑导致的影响;
排查 driver 日志,发现阶段结束后几乎没有有效日志,此时所有 task 已经执行完成,观察集群,executor 的利用率也非常低。
既然存在耗时,后台一定有耗时的线程在运行,于是查看 spark driver 的 thread dump
找到的真正的原因发现卡在 s3 的 rename 操作上
s3 上 rename 操作带来的性能问题#
首先谈一下 rename。 spark 的 rename 是指在 spark 提交作业过程中,为了保持数据的一致性,会生成临时文件来读写数据,当 task 执行完毕,会将临时文件 rename 为正式文件;当 job 执行完毕,会把该 job 的临时目录下的所有文件 rename 为正式文件
其目录格式大概如图所示
driver 会通过 FileFormatWriter 选择合适的 output committer 并启动 writer job,committer 会决定如何提交 job 和 task, 提交流程如下图
rename 就发生在其中提交 job 和提交 task 的环节,具体的 rename 策略根据 committer 的策略而定。关于 committer 的细节等会再提。
为什么 s3 上 rename 会有性能问题
AWS 的 s3 包括大多数对象存储,目录本身就是一个对象,因此,其目录 rename 需要经历 list-copy-delete 的操作,相对与文件系统例如 HDFS 简单的 rename,其开销自然非常大。在 spark 运行中,可能会生成非常多的小文件,即使是 HDFS,要进行数万计小文件的 rename,其性能尚且需要优化,更不要说 s3 了。
spark 的文件提交协议#
在谈及如何优化之前,我们先回顾一下与之相关的 Spark 文件提交过程。从上一张图可以看到,Spark 在 job 提交过程中,实际上依然是调用的 Hadoop 的 committer 来采取具体的 commit 的策略。而 committer 要解决的问题,主要有以下几点:
- 处理文件失败重读导致的数据一致性问题
- 保证 task 推测执行下相同文件多写时的数据正确性
- 提高海量文件读写,合并的效率
目前 Hadoo 提供的两种文件提交方式,通过mapreduce.fileoutputcommitter.algorithm.version
进行切换
FileOutputCommitter V1#
commit 过程
- 首先 TaskAttempt 会将 TaskAttempt Data 写入一个临时目录:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- 当 task data 写完,可以提交 task 后,执行 commitTask,将上述目录转移到正式目录:
${target_dir}/_temporary/${appAttempt}/${taskAttempt}
- 当所有 task 完成,执行 commitJob, 将
${target_dir}/_temporary
下所有文件及目录转移到${target_dir}
正式目录,并在提交完成后当前目录添加标识符_SUCCESS
来表示标识提交成功
数据一致性问题
- 在 TaskAttempt 写入的阶段,如果发生 task 写失败需要重试,只需要重写
${taskAttempt}
目录下/_temporary/
下的所有文件就行,可以保留原先正式的 Attempt 目录 - 如果发生 application 重试,也可以通过 recoverTask 直接恢复原先
${appAttempt}
目录下的正式目录文件,直接重命名到当前${appAttempt}
目录下 - 由于存在两次 rename,V1 实际上是两阶段提交,rename 前后数据的一致性都能得到保证,数据不一致的情况只有可能发生在 rename 的过程中
性能问题
V1 的强一致性带来的负面作用就是两次 rename 的操作在海量文件生成的情景中可能导致耗时问题,尤其是 commitJob 阶段,由于是 Driver 单线程串行执行 commit,此时如果需要 rename 大量文件, 其耗时可能会非常长
FileOutputCommitter V2#
-
首先 TaskAttempt 会将 TaskAttempt Data 写入一个临时目录:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
-
当 task data 写完,可以提交 task 后,执行 commitTask,将上述目录转移到
${target_dir}
. 注意这里是直接移动到正式目录 -
当所有 task 完成,由于所有 data 已经保存在正式目录下,所以 commitJob 就是单纯添加标识符
_SUCCESS
来表示标识提交成功
数据一致性问题
1. 在 taskAttempt 写入的阶段,如果发生 task 写失败重试,由于此时可能 task data 已经被移动到正式目录,因此此时会出现脏数据
2. 如果发生 application 重试,由于之前 application 已提交的数据已经存在在正式目录,因此无需额外的重命名操作,直接继续进行其他数据的重试即可,当然,此时已提交的数据不一定完全正确,其中可能存在脏数据。
3. 可见 V2 过程牺牲一定的数据一致性,选择了最终一致性的方案,由于缺乏中间过程对数据正确性的保证,所以只能通过最后的_SUCCESS 标识符来决定数据是否正确。同时,这还会带来另一个问题,由于存在脏数据,在任务长期运行中,这些脏数据可能无法被正确清理,从而占据存储空间,带来额外的开销
性能问题
V2 之所以采取最终一致性的方案,目录就是减少 V1 rename 操作过多带来的耗时开销。相比 V1,V2 只需要在 task 完成后 rename 到正式目录,而且可以通过 task 线程的并行操作,其执行的时长会被大大降低
小文件优化#
虽然上述的 Committer 的不同算法在一致性和性能上给了大家选择,但毕竟各有利弊。但在实际场景下,大家的选择总是希望 “我全都要”
除了在 rename 阶段进行优化外,性能杀手的源头:对海量小文件的优化也成为了一个行之有效的方法。
Spark 现有的优化:#
在 spark 中内置有对小文件的优化,从文件生成角度:
spark.files.openCostInBytes
利用这个参数设置预估打开文件的大小,设置高一点可以提高小文件分区的速度
从业务侧考虑,大致思路是减少分区数来将小文件合并成大文件
- 使用 coalesce 或 repartition 操作合并分区。
- 减少使用动态分区或者使用
distribute by
来控制 reducer 个数 - 多使用分区表来降低查询时产生的分区数量
- 或者使用更先进的文件压缩格式来提高小文件处理性能
AWS 的特殊优化:#
由于我们在生产环境中使用了 AWS 的 EMR,也稍微了解了一下 AWS 团队在 s3 上小文件优化上的措施
- multi upload:其原理就是利用并发读写文件片段来提高处理 s3 读写的性能,基于此,衍生出 EMRFS S3-optimized Committer 和 s3a Committer(开源),注意,该 committer 默认采用 FileOutputCommitter V2 方式提交,因此 V2 存在的问题在这些 committer 上也都会存在。
- 利用 hdfs 加速,在 EMR 中,考虑到文件系统对于 rename 等操作具有更好的性能,那我们不是可以在文件系统上先 rename,再提交到 s3 上?在 EMR 中,就是在文件提交到 s3 前,先上传到类 hdfs 的文件系统上进行 rename 或者文件合并操作后再上传到 s3 上,这样比起纯 s3 读写,在性能上会有明显收益。当然坏处就是单独维护一个文件系统具有较高成本。
其他优化思路:#
我们团队也在小文件合并上进行了优化,其优化思路就是在 Job 执行的最后,新建一个 job 用于合并小文件,通过继承 Spark 的SqlHadoopMapReduceCommitProtocol
来实现插件式的扩展
合并的思路是在 commitTask 之后,获取数据的分区信息,然后进行分组合并,最后在 commitJob 的时候直接将合并完的文件转移到正式目录中。其基本思路如下图
这样合并小文件的优点
-
该功能是插拔式的,对原生代码的侵入性较低
-
在海量小文件场景下优势明显
缺点
- 新起一个 job 进行优化,在任务最后会新增两个阶段用于小文件合并,会引入更多的 task,带来一定的耗时
尾声#
通过启动该功能,我重新跑一遍任务,最终耗时下降明显降低:
当然优化并未完全结束,在 eks 上的任务耗时总体还是要比原 EMR 任务高,但这块问题的深入排查,等待下次有时间再分享吧