banner
bladedragon

bladedragon

Hadoop CommitProtocol介紹

一些筆記的歸檔。

Hadoop CommitProtocol#

hadoop commitProtocol(全稱 HadoopMapReduceCommitProtocol)是一套用於提交文件到文件系統的規則。是 hadoop 抽象出來的,為了實現存算分離,兼容不同底層文件系統的機制。

這裡我們假定:job 為某種需要並行計算的查詢作業;在大數據計算框架中,這種作業往往由多個 task 組成,以 DAG 的方式觸發執行;job 由調度程序 JobManager 調起,並將 task 分發到各個 executor 中執行作業,最後聚合並返回最終結果。

那麼我們可以將 commitProtocol 的基本執行邏輯簡化成如下方法的集合:

  1. setupJob(): 在執行程序初始化 job,例如創建必要的臨時目錄等
  2. setupTask(): 在 executor 初始化 task,創建臨時目錄等
  3. needTaskCommit(): 是否需要提交 task
  4. commitTask(): 提交 task,將 task 產生的臨時輸出提交到最終的輸出路徑
  5. abortTask(): 當需要終止 task 時,需要清理 task 產生的臨時文件
  6. commitJob(): 當確保所有 task 提交完成後,將提交整個 job 的最終結果
  7. abortJob(): 終止並清理 job

image (1)

commitProtocol 相當於是指導 Mapreduce 作業具體運行在存儲系統上的一套標準。考慮到在 MapReduce 作業一般都是大規模的耗時作業,其中難免會存在各種異常導致部分 task 失敗。因此 commitProtocol 的實現也需要格外考慮數據正確性和事務一致性。

FileOutputCommitter#

和 HadoopCommitProtol 專注於作業整體的提交流程不同,committer 專注於具體的 commit 操作(taskCommit 和 jobCommit)。在 hadoop 原生實現中,主要實現在 FileOutputCommitter 中,並針對性能和隔離性實現了兩種算法

commitV1#

在 commitTask 時,數據文件會從 task 臨時目錄 rename 到 job 臨時目錄中,並在 commitJob 時,從 job 臨時目錄 rename 到正式目錄。由於數據在 job 完成前保持良好的隔離性,失敗作業可快速從上次臨時目錄中恢復數據。

commitV2#

在 commitTask 時,數據文件會從 task 臨時目錄 rename 到 job 的正式目錄,從而在 commitJob 時只需要確認數據完整並進行成功標記即可。但是由於 commiTask 階段的 rename 操作對外可見,因此隔離性不佳,當作業失敗時,也無法快速恢復。

Spark CommitProtocol#

commitProtocol 在 Hadoop 的經典實現是 yarn 和 spark 上。由於現在基本上都從 MR 作業轉向了 spark 作業,所以這裡也只介紹 spark 的 commitProtocol 協議。

spark 的 commitProtocol 實際上在 hadoopCommitProtocol 基礎上進行了封裝,使其適配 spark 的計算執行模型,同時補齊了 hadoopCommitProtocol 的部分缺陷,提升了擴展性。

除此之外,因為繼承了 hadoopCommitProtocol,spark 自然也使用 FileOutputCommitter 用於寫入文件。如果想要接入其他 committer,也可以通過繼承並重寫 commitProtocol 的方式去實現,或者顯式指定 committer,這在 spark 中均有配置實現。

image-20231014130948900

其與 hadoopComitProtocol 的差異在於:

  1. 確保了 task attempt 進行 commitTask 時的正確性(OutputCommitCoordinator)

spark 引入了 OutputCommitCoodinator 跟踪每個 stage 裡 task attempt 的狀態,如果某個 task attempt 成功,後續的所有 attempt 發起的 commitTask 都會被拒絕。通過這種方式保證 commitTask 執行的正確性

image (2)

  1. 支持 hive 外表數據讀寫和動態分區重寫

在 hive 表查詢中,數據可能寫到內部表,也可能寫到外部表。外部表的數據往往存儲在不同的位置。為了支持這種需要輸出數據到不同路徑的操作,spark commitProtocol 允許記錄文件的絕對路徑 newTaskTempFileAbsPath ()。在讀取文件時將文件的絕對路徑保存下來,並在寫入時獲取絕對路徑寫入。

動態分區重寫指的是 spark 在寫入數據的時候允許只重寫指定分區的數據。避免重寫整張表,增大開銷。
什麼是動態分區:

若 sql 中未指定分區字段的具體值,使得該分區可以在計算中自動推斷出,即為動態分區。

例子:

insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};

開啟動態分區重寫的條件:

  1. spark.sql.sources.partitionOverwriteMode=dynamic
  2. SaveMode=overwrite
  3. sql 中存在動態分區的字段

動態分區重寫的基本邏輯

  1. 如果開啟動態分區重寫,spark 會將 job 目錄下生成的臨時數據文件生成在 staging 目錄下
  2. 針對外部表的數據路徑,在每次 rename 絕對路徑的分區文件前,需要先清理並重建其父級目錄避免 rename 時不存在(overwrite)
  3. 針對內部表的數據路徑,由於 commitJob 後路徑仍然在 staging 目錄下,所以需要將所有文件和目錄從 staging 目錄 rename 到正式目錄

參考 issue:[SPARK-35106]

存在的問題#

基於之前一篇文章,當我們通過 Hadoop MapReduce 或者 Spark 將數據寫入到 S3 時,就會存在相當嚴重的性能問題。

在 s3 中,只提供了以下六種基礎操作,所以其他操作只是這幾種操作的衍生。例如rename =list + copy + delete。這導致 rename 操作是非原子性的。傳統文件 rename 操作需要在 s3 上實現 CREATE + DELETE 操作,目錄的 rename 則還需要額外 LIST 所有文件的操作,這導致 rename 操作不是原子的,而且隨著文件數的增加,rename 速度會越來越慢。

操作名解釋
PUT對象的原子寫操作
GET獲取對象
HEAD獲取對象的元數據
LIST基於前綴列出所有對象
COPY複製單個對象
DELETE刪除對象

而 rename 在傳統的 commit 算法中是不可或缺的一環,這將直接導致 commit 算法的性能和正確性受到了挑戰。

另外,在 spark 中也存在 staging 操作。staging 是 spark 作業中重要的步驟,在動態分區重寫的時候,必須要通過 staging 來保障數據的一致性。但是 staging 中同樣存在文件的 rename 操作,在 s3 的場景,這樣的操作會帶來很大的開銷。

S3A Committer#

隨著在 s3 上處理數據規模的增大,除了上述提到的 6 種基礎操作,AWS 還額外提供了兩種操作用來應對大規模的數據傳輸。

  • Bulk Delete
  • Multipart Upload
    後者將是解決 s3 在提交文件性能問題上的關鍵。

簡單介紹一下 Multipart Upload 機制(後面簡稱 MPU):
multiPartUpload 主要分為三個階段

  1. 初始化
  2. 上傳塊
  3. 完成最終 POST 操作

參考 aws 官方文檔

在上傳過程中,已上傳的塊在 s3 中是不可見的,但是會佔用實際存儲。只有當 s3 最後 POST 操作完成後,s3 的 manifest 才會寫入該文件,該文件才會在 s3 上可見。

因此,hadoop 的開發人員充分利用了該機制,設計了兩種 S3A Committer: Staging CommitterMagic Committer

Staging Committer#

在 stagingCommitter 上,數據在 taskAttempt 時被加載到本地進行計算,並在 commitTask 的時候上傳到 s3 上,此時文件被直接 MPU 到最終的 job 目錄上,但是文件信息被上傳到 HDFS 上進行傳統的 FileOutputCommitter 的 V1 算法操作。commitJob 階段,最終在 hdfs 上完成最後的 rename 後,執行最終 POST 操作,上傳的文件在 s3 上才最終可見

Magic Committer#

在 magicCommitter 上,數據從一開始就允許直接寫到 s3 上,不需要落到本地。magicCommitter 會在 job 目錄下創建一個_magic 目錄,taskAttempt 最後輸出的文件都已 magic 文件流的形式 MPU 到這個目錄下。注意此時並沒有完成 MPU,該次 MPU 的 manifest 會寫到文件同名的一個.pending 文件中(同樣在_magic 目錄下)。在 commitTask 時,由於文件數據已經上傳到 s3 上,只需要加載 taskAttempt 路徑下所有.pending 文件,聚合成.pendingset 文件上傳。在 commitJob 階段,則是獲取所有.pendingset 文件,這裡就已經獲取到所有需要 commit 的文件的 manifest,因此依次完成最終 POST 操作即可。


更多詳細內容,可以去閱讀 Hadoop 中 committer 實現的源碼來深入學習。

總的來說,S3A Committer 都是通過延遲文件最後提交操作來避免 rename 操作。基於當前 s3 的版本,兩種 committer 基本都能解決性能問題。不過,hadoop 對 committer 的實現更多考慮了通用性,而針對部分特化的業務場景,則沒有給到足夠的支持。(簡單講就是還無法做到開箱即用)例如在 spark 的動態分區重寫機制以及寫入 hive 外部表的機制,仍然需要自己實現。

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。