一些筆記的歸檔。
Hadoop CommitProtocol#
hadoop commitProtocol(全稱 HadoopMapReduceCommitProtocol)是一套用於提交文件到文件系統的規則。是 hadoop 抽象出來的,為了實現存算分離,兼容不同底層文件系統的機制。
這裡我們假定:job 為某種需要並行計算的查詢作業;在大數據計算框架中,這種作業往往由多個 task 組成,以 DAG 的方式觸發執行;job 由調度程序 JobManager 調起,並將 task 分發到各個 executor 中執行作業,最後聚合並返回最終結果。
那麼我們可以將 commitProtocol 的基本執行邏輯簡化成如下方法的集合:
setupJob()
: 在執行程序初始化 job,例如創建必要的臨時目錄等setupTask()
: 在 executor 初始化 task,創建臨時目錄等needTaskCommit()
: 是否需要提交 taskcommitTask()
: 提交 task,將 task 產生的臨時輸出提交到最終的輸出路徑abortTask()
: 當需要終止 task 時,需要清理 task 產生的臨時文件commitJob()
: 當確保所有 task 提交完成後,將提交整個 job 的最終結果abortJob()
: 終止並清理 job
commitProtocol 相當於是指導 Mapreduce 作業具體運行在存儲系統上的一套標準。考慮到在 MapReduce 作業一般都是大規模的耗時作業,其中難免會存在各種異常導致部分 task 失敗。因此 commitProtocol 的實現也需要格外考慮數據正確性和事務一致性。
FileOutputCommitter#
和 HadoopCommitProtol 專注於作業整體的提交流程不同,committer 專注於具體的 commit 操作(taskCommit 和 jobCommit)。在 hadoop 原生實現中,主要實現在 FileOutputCommitter 中,並針對性能和隔離性實現了兩種算法
commitV1#
在 commitTask 時,數據文件會從 task 臨時目錄 rename 到 job 臨時目錄中,並在 commitJob 時,從 job 臨時目錄 rename 到正式目錄。由於數據在 job 完成前保持良好的隔離性,失敗作業可快速從上次臨時目錄中恢復數據。
commitV2#
在 commitTask 時,數據文件會從 task 臨時目錄 rename 到 job 的正式目錄,從而在 commitJob 時只需要確認數據完整並進行成功標記即可。但是由於 commiTask 階段的 rename 操作對外可見,因此隔離性不佳,當作業失敗時,也無法快速恢復。
Spark CommitProtocol#
commitProtocol 在 Hadoop 的經典實現是 yarn 和 spark 上。由於現在基本上都從 MR 作業轉向了 spark 作業,所以這裡也只介紹 spark 的 commitProtocol 協議。
spark 的 commitProtocol 實際上在 hadoopCommitProtocol 基礎上進行了封裝,使其適配 spark 的計算執行模型,同時補齊了 hadoopCommitProtocol 的部分缺陷,提升了擴展性。
除此之外,因為繼承了 hadoopCommitProtocol,spark 自然也使用 FileOutputCommitter 用於寫入文件。如果想要接入其他 committer,也可以通過繼承並重寫 commitProtocol 的方式去實現,或者顯式指定 committer,這在 spark 中均有配置實現。
其與 hadoopComitProtocol 的差異在於:
- 確保了 task attempt 進行 commitTask 時的正確性(OutputCommitCoordinator)
spark 引入了 OutputCommitCoodinator 跟踪每個 stage 裡 task attempt 的狀態,如果某個 task attempt 成功,後續的所有 attempt 發起的 commitTask 都會被拒絕。通過這種方式保證 commitTask 執行的正確性
- 支持 hive 外表數據讀寫和動態分區重寫
在 hive 表查詢中,數據可能寫到內部表,也可能寫到外部表。外部表的數據往往存儲在不同的位置。為了支持這種需要輸出數據到不同路徑的操作,spark commitProtocol 允許記錄文件的絕對路徑 newTaskTempFileAbsPath ()。在讀取文件時將文件的絕對路徑保存下來,並在寫入時獲取絕對路徑寫入。
動態分區重寫指的是 spark 在寫入數據的時候允許只重寫指定分區的數據。避免重寫整張表,增大開銷。
什麼是動態分區:
若 sql 中未指定分區字段的具體值,使得該分區可以在計算中自動推斷出,即為動態分區。
例子:
insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};
開啟動態分區重寫的條件:
spark.sql.sources.partitionOverwriteMode=dynamic
SaveMode=overwrite
- sql 中存在動態分區的字段
動態分區重寫的基本邏輯
- 如果開啟動態分區重寫,spark 會將 job 目錄下生成的臨時數據文件生成在 staging 目錄下
- 針對外部表的數據路徑,在每次 rename 絕對路徑的分區文件前,需要先清理並重建其父級目錄避免 rename 時不存在(overwrite)
- 針對內部表的數據路徑,由於 commitJob 後路徑仍然在 staging 目錄下,所以需要將所有文件和目錄從 staging 目錄 rename 到正式目錄
參考 issue:[SPARK-35106]
存在的問題#
基於之前一篇文章,當我們通過 Hadoop MapReduce 或者 Spark 將數據寫入到 S3 時,就會存在相當嚴重的性能問題。
在 s3 中,只提供了以下六種基礎操作,所以其他操作只是這幾種操作的衍生。例如rename =list + copy + delete
。這導致 rename 操作是非原子性的。傳統文件 rename 操作需要在 s3 上實現 CREATE + DELETE 操作,目錄的 rename 則還需要額外 LIST 所有文件的操作,這導致 rename 操作不是原子的,而且隨著文件數的增加,rename 速度會越來越慢。
操作名 | 解釋 |
---|---|
PUT | 對象的原子寫操作 |
GET | 獲取對象 |
HEAD | 獲取對象的元數據 |
LIST | 基於前綴列出所有對象 |
COPY | 複製單個對象 |
DELETE | 刪除對象 |
而 rename 在傳統的 commit 算法中是不可或缺的一環,這將直接導致 commit 算法的性能和正確性受到了挑戰。
另外,在 spark 中也存在 staging 操作。staging 是 spark 作業中重要的步驟,在動態分區重寫的時候,必須要通過 staging 來保障數據的一致性。但是 staging 中同樣存在文件的 rename 操作,在 s3 的場景,這樣的操作會帶來很大的開銷。
S3A Committer#
隨著在 s3 上處理數據規模的增大,除了上述提到的 6 種基礎操作,AWS 還額外提供了兩種操作用來應對大規模的數據傳輸。
- Bulk Delete
- Multipart Upload
後者將是解決 s3 在提交文件性能問題上的關鍵。
簡單介紹一下 Multipart Upload 機制(後面簡稱 MPU):
multiPartUpload 主要分為三個階段
- 初始化
- 上傳塊
- 完成最終 POST 操作
參考 aws 官方文檔
在上傳過程中,已上傳的塊在 s3 中是不可見的,但是會佔用實際存儲。只有當 s3 最後 POST 操作完成後,s3 的 manifest 才會寫入該文件,該文件才會在 s3 上可見。
因此,hadoop 的開發人員充分利用了該機制,設計了兩種 S3A Committer: Staging Committer 和Magic Committer
Staging Committer#
在 stagingCommitter 上,數據在 taskAttempt 時被加載到本地進行計算,並在 commitTask 的時候上傳到 s3 上,此時文件被直接 MPU 到最終的 job 目錄上,但是文件信息被上傳到 HDFS 上進行傳統的 FileOutputCommitter 的 V1 算法操作。commitJob 階段,最終在 hdfs 上完成最後的 rename 後,執行最終 POST 操作,上傳的文件在 s3 上才最終可見
Magic Committer#
在 magicCommitter 上,數據從一開始就允許直接寫到 s3 上,不需要落到本地。magicCommitter 會在 job 目錄下創建一個_magic 目錄,taskAttempt 最後輸出的文件都已 magic 文件流的形式 MPU 到這個目錄下。注意此時並沒有完成 MPU,該次 MPU 的 manifest 會寫到文件同名的一個.pending 文件中(同樣在_magic 目錄下)。在 commitTask 時,由於文件數據已經上傳到 s3 上,只需要加載 taskAttempt 路徑下所有.pending 文件,聚合成.pendingset 文件上傳。在 commitJob 階段,則是獲取所有.pendingset 文件,這裡就已經獲取到所有需要 commit 的文件的 manifest,因此依次完成最終 POST 操作即可。
更多詳細內容,可以去閱讀 Hadoop 中 committer 實現的源碼來深入學習。
總的來說,S3A Committer 都是通過延遲文件最後提交操作來避免 rename 操作。基於當前 s3 的版本,兩種 committer 基本都能解決性能問題。不過,hadoop 對 committer 的實現更多考慮了通用性,而針對部分特化的業務場景,則沒有給到足夠的支持。(簡單講就是還無法做到開箱即用)例如在 spark 的動態分區重寫機制以及寫入 hive 外部表的機制,仍然需要自己實現。