但是最近在測試 spark on k8s 的時候,遇到了一些性能問題,於是記錄一下排查過程,做一下案例的復盤。
案例再現#
我們使用的底層集群是 AWS 的 eks 集群,在此之上搭建了一套傳統的 hadoop yarn+ spark 環境。通俗來說就是將 yarn 的 resourceManager,nodeManager 等具體的組件轉化成 k8s 的 pod,但是上層調度邏輯不變,做了一套兩層調度的系統。該系統的具體實現方案和調度邏輯這裡按下不表,因為這和今天講述的性能優化案例關係不大,我們只要了解到這是傳統的 yarnspark,但是底層是 k8s 就好。
我採用的任務是來自我們生產環境的一個小任務,期望運行時間 5min,數據量中等,輸出表數據大約在 2kw 行。原先生產環境採用的是 AWS 的 EMR on EC2 集群,可以理解是傳統的容器主機集群。現在將該任務遷移到 eks 集群後,時長達到了 43min,於是觀察其 history ui:
發現其 sql 執行的階段耗時與生產環境的耗時基本類似,階段執行時長只有幾分鐘,但是整個 spark job 完成耗時卻用了 43min,明顯在 job 執行尾部存在耗時問題,導致長尾現象發生。
初步分析#
由於採用生產任務一樣的 sql,讀寫數據量也完全一致,排除業務邏輯導致的影響;
排查 driver 日誌,發現階段結束後幾乎沒有有效日誌,此時所有 task 已經執行完成,觀察集群,executor 的利用率也非常低。
既然存在耗時,後台一定有耗時的線程在運行,於是查看 spark driver 的 thread dump。
找到的真正的原因發現卡在 s3 的 rename 操作上。
s3 上 rename 操作帶來的性能問題#
首先談一下 rename。 spark 的 rename 是指在 spark 提交作業過程中,為了保持數據的一致性,會生成臨時文件來讀寫數據,當 task 執行完畢,會將臨時文件 rename 為正式文件;當 job 執行完畢,會把該 job 的臨時目錄下的所有文件 rename 為正式文件。
其目錄格式大概如圖所示。
driver 會通過 FileFormatWriter 選擇合適的 output committer 並啟動 writer job,committer 會決定如何提交 job 和 task,提交流程如下圖。
rename 就發生在其中提交 job 和提交 task 的環節,具體的 rename 策略根據 committer 的策略而定。關於 committer 的細節等會再提。
為什麼 s3 上 rename 會有性能問題
AWS 的 s3 包括大多數對象存儲,目錄本身就是一個對象,因此,其目錄 rename 需要經歷 list-copy-delete 的操作,相對於文件系統例如 HDFS 簡單的 rename,其開銷自然非常大。在 spark 運行中,可能會生成非常多的小文件,即使是 HDFS,要進行數萬計小文件的 rename,其性能尚且需要優化,更不要說 s3 了。
spark 的文件提交協議#
在談及如何優化之前,我們先回顧一下與之相關的 Spark 文件提交過程。從上一張圖可以看到,Spark 在 job 提交過程中,實際上依然是調用的 Hadoop 的 committer 來採取具體的 commit 的策略。而 committer 要解決的問題,主要有以下幾點:
- 處理文件失敗重讀導致的數據一致性問題
- 保證 task 推測執行下相同文件多寫時的數據正確性
- 提高海量文件讀寫,合併的效率
目前 Hadoo 提供的兩種文件提交方式,通過mapreduce.fileoutputcommitter.algorithm.version
進行切換。
FileOutputCommitter V1#
commit 過程
- 首先 TaskAttempt 會將 TaskAttempt Data 寫入一個臨時目錄:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- 當 task data 寫完,可以提交 task 後,執行 commitTask,將上述目錄轉移到正式目錄:
${target_dir}/_temporary/${appAttempt}/${taskAttempt}
- 當所有 task 完成,執行 commitJob, 將
${target_dir}/_temporary
下所有文件及目錄轉移到${target_dir}
正式目錄,並在提交完成後當前目錄添加標識符_SUCCESS
來表示標識提交成功。
數據一致性問題
- 在 TaskAttempt 寫入的階段,如果發生 task 寫失敗需要重試,只需要重寫
${taskAttempt}
目錄下/_temporary/
下的所有文件就行,可以保留原先正式的 Attempt 目錄。 - 如果發生 application 重試,也可以通過 recoverTask 直接恢復原先
${appAttempt}
目錄下的正式目錄文件,直接重命名到當前${appAttempt}
目錄下。 - 由於存在兩次 rename,V1 實際上是兩階段提交,rename 前後數據的一致性都能得到保證,數據不一致的情況只有可能發生在 rename 的過程中。
性能問題
V1 的強一致性帶來的負面作用就是兩次 rename 的操作在海量文件生成的情景中可能導致耗時問題,尤其是 commitJob 階段,由於是 Driver 單線程串行執行 commit,此時如果需要 rename 大量文件,其耗時可能會非常長。
FileOutputCommitter V2#
- 首先 TaskAttempt 會將 TaskAttempt Data 寫入一個臨時目錄:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- 當 task data 寫完,可以提交 task 後,執行 commitTask,將上述目錄轉移到
${target_dir}
. 注意這裡是直接移動到正式目錄。 - 當所有 task 完成,由於所有 data 已經保存在正式目錄下,所以 commitJob 就是單純添加標識符
_SUCCESS
來表示標識提交成功。
數據一致性問題
1. 在 taskAttempt 寫入的階段,如果發生 task 寫失敗重試,由於此時可能 task data 已經被移動到正式目錄,因此此時會出現髒數據。
2. 如果發生 application 重試,由於之前 application 已提交的數據已經存在在正式目錄,因此無需額外的重命名操作,直接繼續進行其他數據的重試即可,當然,此時已提交的數據不一定完全正確,其中可能存在髒數據。
3. 可見 V2 過程犧牲一定的數據一致性,選擇了最終一致性的方案,由於缺乏中間過程對數據正確性的保證,所以只能通過最後的_SUCCESS 標識符來決定數據是否正確。同時,這還會帶來另一個問題,由於存在髒數據,在任務長期運行中,這些髒數據可能無法被正確清理,從而佔據存儲空間,帶來額外的開銷。
性能問題
V2 之所以採取最終一致性的方案,目錄就是減少 V1 rename 操作過多帶來的耗時開銷。相比 V1,V2 只需要在 task 完成後 rename 到正式目錄,而且可以通過 task 線程的並行操作,其執行的時長會被大大降低。
小文件優化#
雖然上述的 Committer 的不同算法在一致性和性能上給了大家選擇,但畢竟各有利弊。但在實際場景下,大家的選擇總是希望 “我全都要”。
除了在 rename 階段進行優化外,性能殺手的源頭:對海量小文件的優化也成為了一個行之有效的方法。
Spark 現有的優化:#
在 spark 中內置有對小文件的優化,從文件生成角度:
spark.files.openCostInBytes
利用這個參數設置預估打開文件的大小,設置高一點可以提高小文件分區的速度。
從業務側考慮,大致思路是減少分區數來將小文件合併成大文件。
- 使用 coalesce 或 repartition 操作合併分區。
- 減少使用動態分區或者使用
distribute by
來控制 reducer 個數。 - 多使用分區表來降低查詢時產生的分區數量。
- 或者使用更先進的文件壓縮格式來提高小文件處理性能。
AWS 的特殊優化:#
由於我們在生產環境中使用了 AWS 的 EMR,也稍微了解了一下 AWS 團隊在 s3 上小文件優化上的措施。
- multi upload:其原理就是利用並發讀寫文件片段來提高處理 s3 讀寫的性能,基於此,衍生出 EMRFS S3-optimized Committer 和 s3a Committer(開源),注意,該 committer 默認採用 FileOutputCommitter V2 方式提交,因此 V2 存在的問題在這些 committer 上也都會存在。
- 利用 hdfs 加速,在 EMR 中,考慮到文件系統對於 rename 等操作具有更好的性能,那我們不是可以在文件系統上先 rename,再提交到 s3 上?在 EMR 中,就是在文件提交到 s3 前,先上傳到類 hdfs 的文件系統上進行 rename 或者文件合併操作後再上傳到 s3 上,這樣比起純 s3 讀寫,在性能上會有明顯收益。當然壞處就是單獨維護一個文件系統具有較高成本。
其他優化思路:#
我們團隊也在小文件合併上進行了優化,其優化思路就是在 Job 執行的最後,新建一個 job 用於合併小文件,通過繼承 Spark 的SqlHadoopMapReduceCommitProtocol
來實現插件式的擴展。
合併的思路是在 commitTask 之後,獲取數據的分區信息,然後進行分組合併,最後在 commitJob 的時候直接將合併完的文件轉移到正式目錄中。其基本思路如下圖。
這樣合併小文件的優點:
- 該功能是插拔式的,對原生代碼的侵入性較低。
- 在海量小文件場景下優勢明顯。
缺點:
- 新起一個 job 進行優化,在任務最後會新增兩個階段用於小文件合併,會引入更多的 task,帶來一定的耗時。
尾聲#
通過啟動該功能,我重新跑一遍任務,最終耗時下降明顯降低:
當然優化並未完全結束,在 eks 上的任務耗時總體還是要比原 EMR 任務高,但這塊問題的深入排查,等待下次有時間再分享吧。