いくつかのノートのアーカイブ。
Hadoop CommitProtocol#
hadoop commitProtocol(正式名称 HadoopMapReduceCommitProtocol)は、ファイルをファイルシステムに提出するためのルールのセットです。これは hadoop が抽象化したもので、計算とストレージの分離を実現し、異なる基盤ファイルシステムに対応するメカニズムです。
ここでは、job を並列計算が必要なクエリ作業の一種と仮定します。大規模データ計算フレームワークでは、このような作業は通常、複数のタスクで構成され、DAG の方式で実行されます。job はスケジューラ JobManager によって呼び出され、タスクは各 executor に配布されて作業が実行され、最終的な結果が集約されて返されます。
したがって、commitProtocol の基本的な実行ロジックを以下のメソッドの集合に簡略化できます:
setupJob()
: 実行プログラムが job を初期化する際、必要な一時ディレクトリを作成するなどsetupTask()
: executor がタスクを初期化し、一時ディレクトリを作成するなどneedTaskCommit()
: タスクを提出する必要があるかどうかcommitTask()
: タスクを提出し、タスクが生成した一時出力を最終出力パスに提出するabortTask()
: タスクを中止する必要がある場合、タスクが生成した一時ファイルをクリーンアップするcommitJob()
: すべてのタスクの提出が完了したことを確認した後、job 全体の最終結果を提出するabortJob()
: job を中止し、クリーンアップする
commitProtocol は、Mapreduce 作業がストレージシステム上で具体的に実行されるための標準を指導するものです。MapReduce 作業は一般的に大規模で時間がかかる作業であり、その中でさまざまな例外が発生し、部分的なタスクが失敗することが避けられません。したがって、commitProtocol の実装はデータの正確性とトランザクションの整合性を特に考慮する必要があります。
FileOutputCommitter#
HadoopCommitProtol が作業全体の提出プロセスに焦点を当てているのに対し、committer は具体的な commit 操作(taskCommit と jobCommit)に焦点を当てています。hadoop のネイティブ実装では、主に FileOutputCommitter に実装されており、性能と隔離性のために 2 つのアルゴリズムが実装されています。
commitV1#
commitTask 時に、データファイルはタスクの一時ディレクトリから job の一時ディレクトリに rename され、commitJob 時に job の一時ディレクトリから正式なディレクトリに rename されます。データは job が完了する前に良好な隔離性を保持するため、失敗した作業は前回の一時ディレクトリから迅速にデータを回復できます。
commitV2#
commitTask 時に、データファイルはタスクの一時ディレクトリから job の正式なディレクトリに rename されるため、commitJob 時にはデータの完全性を確認し、成功のマークを付けるだけで済みます。しかし、commiTask 段階の rename 操作は外部から見えるため、隔離性が良くなく、作業が失敗した場合、迅速に回復することができません。
Spark CommitProtocol#
commitProtocol は Hadoop のクラシックな実装で yarn と spark にあります。現在、ほとんどが MR 作業から spark 作業に移行しているため、ここでは spark の commitProtocol プロトコルのみを紹介します。
spark の commitProtocol は実際には hadoopCommitProtocol の基礎の上にラップされており、spark の計算実行モデルに適合させるとともに、hadoopCommitProtocol の一部の欠陥を補完し、拡張性を向上させています。
さらに、hadoopCommitProtocol を継承しているため、spark は自然に FileOutputCommitter を使用してファイルに書き込みます。他の committer を接続したい場合は、commitProtocol を継承してオーバーライドするか、committer を明示的に指定することで実現できます。これらは spark で設定実装されています。
hadoopComitProtocol との違いは以下の通りです:
- task attempt が commitTask を行う際の正確性を保証(OutputCommitCoordinator)
spark は OutputCommitCoodinator を導入し、各ステージ内のタスク attempt の状態を追跡します。もし特定の task attempt が成功した場合、以降のすべての attempt が発起した commitTask は拒否されます。この方法で commitTask の実行の正確性を保証します。
- hive 外部テーブルのデータの読み書きと動的パーティションの上書きをサポート
hive テーブルのクエリでは、データは内部テーブルに書き込まれることもあれば、外部テーブルに書き込まれることもあります。外部テーブルのデータは通常、異なる場所に保存されます。このような異なるパスにデータを出力する必要をサポートするために、spark commitProtocol はファイルの絶対パスを記録することを許可します。ファイルを読み込む際にファイルの絶対パスを保存し、書き込む際にその絶対パスを取得して書き込みます。
動的パーティションの上書きは、spark がデータを書き込む際に特定のパーティションのデータのみを上書きすることを許可します。全体のテーブルを上書きすることを避け、オーバーヘッドを増やします。
動的パーティションとは:
SQL でパーティションフィールドの具体的な値が指定されていない場合、そのパーティションは計算中に自動的に推測されることが動的パーティションです。
例:
insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};
動的パーティションの上書きを有効にする条件:
spark.sql.sources.partitionOverwriteMode=dynamic
SaveMode=overwrite
- SQL に動的パーティションのフィールドが存在する
動的パーティションの上書きの基本的なロジック
- 動的パーティションの上書きを有効にすると、spark は job ディレクトリ下に生成された一時データファイルを staging ディレクトリに生成します。
- 外部テーブルのデータパスに対して、絶対パスのパーティションファイルを rename する前に、親ディレクトリをクリーンアップして再作成し、rename 時に存在しないことを避けます(overwrite)。
- 内部テーブルのデータパスに対しては、commitJob 後もパスが staging ディレクトリに残るため、すべてのファイルとディレクトリを staging ディレクトリから正式なディレクトリに rename する必要があります。
参考 issue:[SPARK-35106]
存在する問題#
以前の記事に基づいて、Hadoop MapReduce または Spark を使用してデータを S3 に書き込むと、かなり深刻なパフォーマンスの問題が発生します。
S3 では、以下の 6 種類の基本操作のみが提供されているため、他の操作はこれらの操作の派生に過ぎません。例えば、rename = list + copy + delete
。これにより、rename 操作は非原子的になります。従来のファイル rename 操作は S3 上で CREATE + DELETE 操作を実行する必要があり、ディレクトリの rename にはすべてのファイルを LIST する追加の操作が必要です。これにより、rename 操作は原子的ではなく、ファイル数が増えるにつれて rename 速度が遅くなります。
操作名 | 説明 |
---|---|
PUT | オブジェクトの原子的な書き込み操作 |
GET | オブジェクトを取得 |
HEAD | オブジェクトのメタデータを取得 |
LIST | プレフィックスに基づいてすべてのオブジェクトを列挙 |
COPY | 単一オブジェクトをコピー |
DELETE | オブジェクトを削除 |
rename は従来の commit アルゴリズムにおいて不可欠な要素であり、これにより commit アルゴリズムの性能と正確性が直接的に影響を受けます。
さらに、spark にも staging 操作が存在します。staging は spark 作業の重要なステップであり、動的パーティションの上書き時にはデータの整合性を保証するために staging を通過する必要があります。しかし、staging にもファイルの rename 操作が存在し、S3 のシナリオでは、このような操作が大きなオーバーヘッドをもたらします。
S3A Committer#
S3 上でのデータ処理規模の増大に伴い、上記の 6 種類の基本操作に加えて、AWS は大規模なデータ転送に対応するために 2 つの追加操作を提供しています。
- Bulk Delete
- Multipart Upload
後者は S3 におけるファイル提出の性能問題を解決する鍵となります。
Multipart Upload メカニズム(以下 MPU と略します)を簡単に紹介します:
multiPartUpload は主に 3 つの段階に分かれます
- 初期化
- ブロックのアップロード
- 最終 POST 操作の完了
参考 aws 公式文書
アップロードの過程で、アップロードされたブロックは S3 では見えませんが、実際のストレージを占有します。S3 の最終 POST 操作が完了したときにのみ、S3 のマニフェストがそのファイルに書き込まれ、そのファイルが S3 上で見えるようになります。
したがって、hadoop の開発者はこのメカニズムを最大限に活用し、2 つの S3A Committer を設計しました: Staging Committer と Magic Committer
Staging Committer#
stagingCommitter では、データは taskAttempt 時にローカルにロードされて計算され、commitTask 時に S3 にアップロードされます。このとき、ファイルは最終的な job ディレクトリに直接 MPU されますが、ファイル情報は HDFS にアップロードされ、従来の FileOutputCommitter の V1 アルゴリズム操作が行われます。commitJob 段階では、最終的に HDFS 上で rename が完了した後、最終 POST 操作が実行され、アップロードされたファイルが S3 上で最終的に見えるようになります。
Magic Committer#
magicCommitter では、データは最初から直接 S3 に書き込むことが許可され、ローカルに落とす必要はありません。magicCommitter は job ディレクトリ下に_magic ディレクトリを作成し、taskAttempt の最終出力ファイルは magic ファイルストリームの形式でこのディレクトリに MPU されます。この時点では MPU は完了していません。この MPU のマニフェストは、ファイルと同名の.pending ファイルに書き込まれます(同様に_magic ディレクトリ内)。commitTask 時には、ファイルデータがすでに S3 にアップロードされているため、taskAttempt パス下のすべての.pending ファイルをロードし、.pendingset ファイルに集約してアップロードします。commitJob 段階では、すべての.pendingset ファイルを取得し、ここですでにコミットする必要のあるファイルのマニフェストを取得して、順次最終 POST 操作を完了させます。
詳細については、Hadoop における committer 実装のソースコードを読んで深く学ぶことができます。
総じて、S3A Committer はファイルの最終的な提出操作を遅延させて rename 操作を回避します。現在の S3 のバージョンに基づいて、2 つの committer は基本的に性能問題を解決できます。ただし、hadoop の committer の実装は一般性を考慮しており、一部の特化したビジネスシーンに対しては十分なサポートが提供されていません。(簡単に言えば、まだ即座に使用できる状態にはなっていません)例えば、spark の動的パーティションの上書きメカニズムや hive 外部テーブルへの書き込みメカニズムでは、依然として自分で実装する必要があります。