banner
bladedragon

bladedragon

Spark Job長尾問題排查及小文件優化的思考

但是最近在測試 spark on k8s 的時候,遇到了一些性能問題,於是記錄一下排查過程,做一下案例的復盤。

20220202163506_2db51

案例再現#

我們使用的底層集群是 AWS 的 eks 集群,在此之上搭建了一套傳統的 hadoop yarn+ spark 環境。通俗來說就是將 yarn 的 resourceManager,nodeManager 等具體的組件轉化成 k8s 的 pod,但是上層調度邏輯不變,做了一套兩層調度的系統。該系統的具體實現方案和調度邏輯這裡按下不表,因為這和今天講述的性能優化案例關係不大,我們只要了解到這是傳統的 yarnspark,但是底層是 k8s 就好。
我採用的任務是來自我們生產環境的一個小任務,期望運行時間 5min,數據量中等,輸出表數據大約在 2kw 行。原先生產環境採用的是 AWS 的 EMR on EC2 集群,可以理解是傳統的容器主機集群。現在將該任務遷移到 eks 集群後,時長達到了 43min,於是觀察其 history ui:

IMG_export_20230415_163937706

IMG_export_20230415_163947995

發現其 sql 執行的階段耗時與生產環境的耗時基本類似,階段執行時長只有幾分鐘,但是整個 spark job 完成耗時卻用了 43min,明顯在 job 執行尾部存在耗時問題,導致長尾現象發生。

初步分析#

由於採用生產任務一樣的 sql,讀寫數據量也完全一致,排除業務邏輯導致的影響;
排查 driver 日誌,發現階段結束後幾乎沒有有效日誌,此時所有 task 已經執行完成,觀察集群,executor 的利用率也非常低。
既然存在耗時,後台一定有耗時的線程在運行,於是查看 spark driver 的 thread dump。

找到的真正的原因發現卡在 s3 的 rename 操作上。

s3 上 rename 操作帶來的性能問題#

首先談一下 rename。 spark 的 rename 是指在 spark 提交作業過程中,為了保持數據的一致性,會生成臨時文件來讀寫數據,當 task 執行完畢,會將臨時文件 rename 為正式文件;當 job 執行完畢,會把該 job 的臨時目錄下的所有文件 rename 為正式文件。
其目錄格式大概如圖所示。

image

driver 會通過 FileFormatWriter 選擇合適的 output committer 並啟動 writer job,committer 會決定如何提交 job 和 task,提交流程如下圖。

image

rename 就發生在其中提交 job 和提交 task 的環節,具體的 rename 策略根據 committer 的策略而定。關於 committer 的細節等會再提。

為什麼 s3 上 rename 會有性能問題
AWS 的 s3 包括大多數對象存儲,目錄本身就是一個對象,因此,其目錄 rename 需要經歷 list-copy-delete 的操作,相對於文件系統例如 HDFS 簡單的 rename,其開銷自然非常大。在 spark 運行中,可能會生成非常多的小文件,即使是 HDFS,要進行數萬計小文件的 rename,其性能尚且需要優化,更不要說 s3 了。

spark 的文件提交協議#

在談及如何優化之前,我們先回顧一下與之相關的 Spark 文件提交過程。從上一張圖可以看到,Spark 在 job 提交過程中,實際上依然是調用的 Hadoop 的 committer 來採取具體的 commit 的策略。而 committer 要解決的問題,主要有以下幾點:

  1. 處理文件失敗重讀導致的數據一致性問題
  2. 保證 task 推測執行下相同文件多寫時的數據正確性
  3. 提高海量文件讀寫,合併的效率

目前 Hadoo 提供的兩種文件提交方式,通過mapreduce.fileoutputcommitter.algorithm.version進行切換。

FileOutputCommitter V1#

commit 過程

  1. 首先 TaskAttempt 會將 TaskAttempt Data 寫入一個臨時目錄: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  2. 當 task data 寫完,可以提交 task 後,執行 commitTask,將上述目錄轉移到正式目錄:${target_dir}/_temporary/${appAttempt}/${taskAttempt}
  3. 當所有 task 完成,執行 commitJob, 將${target_dir}/_temporary下所有文件及目錄轉移到${target_dir}正式目錄,並在提交完成後當前目錄添加標識符 _SUCCESS來表示標識提交成功。

數據一致性問題

  1. 在 TaskAttempt 寫入的階段,如果發生 task 寫失敗需要重試,只需要重寫${taskAttempt}目錄下/_temporary/下的所有文件就行,可以保留原先正式的 Attempt 目錄。
  2. 如果發生 application 重試,也可以通過 recoverTask 直接恢復原先${appAttempt}目錄下的正式目錄文件,直接重命名到當前${appAttempt}目錄下。
  3. 由於存在兩次 rename,V1 實際上是兩階段提交,rename 前後數據的一致性都能得到保證,數據不一致的情況只有可能發生在 rename 的過程中。

性能問題
V1 的強一致性帶來的負面作用就是兩次 rename 的操作在海量文件生成的情景中可能導致耗時問題,尤其是 commitJob 階段,由於是 Driver 單線程串行執行 commit,此時如果需要 rename 大量文件,其耗時可能會非常長。

FileOutputCommitter V2#

  1. 首先 TaskAttempt 會將 TaskAttempt Data 寫入一個臨時目錄: ${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
  2. 當 task data 寫完,可以提交 task 後,執行 commitTask,將上述目錄轉移到 ${target_dir}. 注意這裡是直接移動到正式目錄。
  3. 當所有 task 完成,由於所有 data 已經保存在正式目錄下,所以 commitJob 就是單純添加標識符 _SUCCESS來表示標識提交成功。

數據一致性問題
1. 在 taskAttempt 寫入的階段,如果發生 task 寫失敗重試,由於此時可能 task data 已經被移動到正式目錄,因此此時會出現髒數據。
2. 如果發生 application 重試,由於之前 application 已提交的數據已經存在在正式目錄,因此無需額外的重命名操作,直接繼續進行其他數據的重試即可,當然,此時已提交的數據不一定完全正確,其中可能存在髒數據。
3. 可見 V2 過程犧牲一定的數據一致性,選擇了最終一致性的方案,由於缺乏中間過程對數據正確性的保證,所以只能通過最後的_SUCCESS 標識符來決定數據是否正確。同時,這還會帶來另一個問題,由於存在髒數據,在任務長期運行中,這些髒數據可能無法被正確清理,從而佔據存儲空間,帶來額外的開銷。

性能問題
V2 之所以採取最終一致性的方案,目錄就是減少 V1 rename 操作過多帶來的耗時開銷。相比 V1,V2 只需要在 task 完成後 rename 到正式目錄,而且可以通過 task 線程的並行操作,其執行的時長會被大大降低。

小文件優化#

雖然上述的 Committer 的不同算法在一致性和性能上給了大家選擇,但畢竟各有利弊。但在實際場景下,大家的選擇總是希望 “我全都要”。

fm=173&fmt=auto&h=212&img_JPEG=&s=198008D41E4200570CB830AA0300E012&u=1102103420,837649793&w=393

除了在 rename 階段進行優化外,性能殺手的源頭:對海量小文件的優化也成為了一個行之有效的方法。

Spark 現有的優化:#

在 spark 中內置有對小文件的優化,從文件生成角度:

  • spark.files.openCostInBytes 利用這個參數設置預估打開文件的大小,設置高一點可以提高小文件分區的速度。

從業務側考慮,大致思路是減少分區數來將小文件合併成大文件。

  • 使用 coalesce 或 repartition 操作合併分區。
  • 減少使用動態分區或者使用 distribute by來控制 reducer 個數。
  • 多使用分區表來降低查詢時產生的分區數量。
  • 或者使用更先進的文件壓縮格式來提高小文件處理性能。

AWS 的特殊優化:#

由於我們在生產環境中使用了 AWS 的 EMR,也稍微了解了一下 AWS 團隊在 s3 上小文件優化上的措施。

  1. multi upload:其原理就是利用並發讀寫文件片段來提高處理 s3 讀寫的性能,基於此,衍生出 EMRFS S3-optimized Committer 和 s3a Committer(開源),注意,該 committer 默認採用 FileOutputCommitter V2 方式提交,因此 V2 存在的問題在這些 committer 上也都會存在。
  2. 利用 hdfs 加速,在 EMR 中,考慮到文件系統對於 rename 等操作具有更好的性能,那我們不是可以在文件系統上先 rename,再提交到 s3 上?在 EMR 中,就是在文件提交到 s3 前,先上傳到類 hdfs 的文件系統上進行 rename 或者文件合併操作後再上傳到 s3 上,這樣比起純 s3 讀寫,在性能上會有明顯收益。當然壞處就是單獨維護一個文件系統具有較高成本。

其他優化思路:#

我們團隊也在小文件合併上進行了優化,其優化思路就是在 Job 執行的最後,新建一個 job 用於合併小文件,通過繼承 Spark 的SqlHadoopMapReduceCommitProtocol來實現插件式的擴展。
合併的思路是在 commitTask 之後,獲取數據的分區信息,然後進行分組合併,最後在 commitJob 的時候直接將合併完的文件轉移到正式目錄中。其基本思路如下圖。

application-and-practice-of-spark-small-file-mergi1

這樣合併小文件的優點:

  • 該功能是插拔式的,對原生代碼的侵入性較低。
  • 在海量小文件場景下優勢明顯。

缺點:

  • 新起一個 job 進行優化,在任務最後會新增兩個階段用於小文件合併,會引入更多的 task,帶來一定的耗時。

尾聲#

通過啟動該功能,我重新跑一遍任務,最終耗時下降明顯降低:

IMG_export_20230416_174701801

當然優化並未完全結束,在 eks 上的任務耗時總體還是要比原 EMR 任務高,但這塊問題的深入排查,等待下次有時間再分享吧。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。