最近、k8s 上で spark をテストしているときに、いくつかのパフォーマンス問題に直面したため、トラブルシューティングのプロセスを記録し、ケーススタディを振り返ります。
ケース再現#
私たちが使用している基盤クラスターは AWS の eks クラスターで、その上に従来の hadoop yarn+ spark 環境を構築しました。簡単に言うと、yarn の resourceManager、nodeManager などの具体的なコンポーネントを k8s の pod に変換しましたが、上層のスケジューリングロジックは変わらず、二層のスケジューリングシステムを構築しました。このシステムの具体的な実装方案とスケジューリングロジックについてはここでは触れませんが、これは従来の yarnspark であり、基盤が k8s であることを理解していれば大丈夫です。
私が使用したタスクは、私たちの生産環境からの小さなタスクで、期待される実行時間は 5 分、データ量は中程度で、出力テーブルのデータは約 2kw 行です。元々の生産環境では AWS の EMR on EC2 クラスターを使用しており、従来のコンテナホストクラスターと理解できます。現在、このタスクを eks クラスターに移行した後、所要時間は 43 分に達しましたので、その history ui を観察しました。
SQL 実行の段階の所要時間は生産環境の所要時間と基本的に似ており、段階実行の所要時間は数分しかありませんが、全体の spark ジョブの完了に 43 分かかりました。明らかにジョブ実行の末尾に所要時間の問題が存在し、長尾現象が発生しています。
初歩的な分析#
生産タスクと同じ SQL を使用し、読み書きデータ量も完全に一致しているため、ビジネスロジックによる影響は排除されました;
ドライバーログを調査したところ、段階終了後ほとんど有効なログがなく、この時点ですべてのタスクが実行を完了しており、クラスターを観察すると、executor の利用率も非常に低いことがわかりました。
所要時間が存在する以上、バックグラウンドで必ず所要時間のかかるスレッドが実行されているはずなので、spark ドライバーのスレッドダンプを確認しました。
本当の原因は、s3 の rename 操作で詰まっていることがわかりました。
s3 上の rename 操作によるパフォーマンス問題#
まず rename について話しましょう。 spark の rename は、spark がジョブを提出する過程でデータの一貫性を保つために、一時ファイルを生成してデータを読み書きします。タスクが実行を完了すると、一時ファイルを正式なファイルに rename します;ジョブが実行を完了すると、そのジョブの一時ディレクトリ下のすべてのファイルを正式なファイルに rename します。
そのディレクトリ形式は大体以下のようになります。
ドライバーは FileFormatWriter を通じて適切な output committer を選択し、writer ジョブを開始します。committer はジョブとタスクをどのように提出するかを決定し、提出の流れは以下の図のようになります。
rename はジョブとタスクの提出の間に発生し、具体的な rename 戦略は committer の戦略によって決まります。committer の詳細については後で触れます。
なぜ s3 上の rename にパフォーマンス問題があるのか
AWS の s3 を含むほとんどのオブジェクトストレージでは、ディレクトリ自体がオブジェクトであるため、そのディレクトリの rename は list-copy-delete の操作を経る必要があります。ファイルシステム、例えば HDFS の単純な rename に比べて、そのコストは非常に大きくなります。spark の実行中に非常に多くの小ファイルが生成される可能性があり、HDFS で数万の小ファイルを rename する場合でも、そのパフォーマンスは最適化が必要です。s3 の場合は言うまでもありません。
spark のファイル提出プロトコル#
最適化方法について話す前に、まず関連する Spark ファイル提出プロセスを振り返りましょう。前の図からわかるように、Spark はジョブ提出の過程で、実際には Hadoop の committer を呼び出して具体的な commit 戦略を採用しています。committer が解決すべき問題は主に以下の点です:
- ファイルの失敗再読み込みによるデータの一貫性問題の処理
- タスクの推測実行下で同じファイルの多重書き込み時のデータの正確性を保証する
- 大量のファイルの読み書き、結合の効率を向上させる
現在、Hadoop が提供する 2 つのファイル提出方式は、mapreduce.fileoutputcommitter.algorithm.version
を通じて切り替えられます。
FileOutputCommitter V1#
commit プロセス
- まず TaskAttempt は TaskAttempt Data を一時ディレクトリに書き込みます:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- タスクデータの書き込みが完了したら、タスクを提出できるようになり、commitTask を実行して上記のディレクトリを正式なディレクトリに移動します:
${target_dir}/_temporary/${appAttempt}/${taskAttempt}
- すべてのタスクが完了したら、commitJob を実行し、
${target_dir}/_temporary
下のすべてのファイルとディレクトリを${target_dir}
の正式なディレクトリに移動し、提出が完了した後に現在のディレクトリに識別子_SUCCESS
を追加して提出成功を示します。
データの一貫性問題
- TaskAttempt の書き込み段階で、タスクの書き込みが失敗して再試行が必要な場合、
${taskAttempt}
ディレクトリ下の/_temporary/
下のすべてのファイルを再書き込みすればよく、元の正式な Attempt ディレクトリを保持できます。 - アプリケーションの再試行が発生した場合、recoverTask を通じて元の
${appAttempt}
ディレクトリ下の正式なディレクトリファイルを直接復元し、現在の${appAttempt}
ディレクトリに直接 rename できます。 - 2 回の rename が存在するため、V1 は実際には 2 段階の提出であり、rename 前後のデータの一貫性は保証され、データが不一致になるのは rename の過程でのみ可能です。
パフォーマンス問題
V1 の強い一貫性がもたらす負の影響は、膨大なファイルが生成される状況で 2 回の rename 操作が所要時間の問題を引き起こす可能性があることです。特に commitJob 段階では、Driver が単一スレッドでシリアルに commit を実行するため、この時点で大量のファイルを rename する必要がある場合、所要時間が非常に長くなる可能性があります。
FileOutputCommitter V2#
- まず TaskAttempt は TaskAttempt Data を一時ディレクトリに書き込みます:
${target_dir}/_temporary/${appAttempt}/_temporary/${taskAttempt}
- タスクデータの書き込みが完了したら、タスクを提出できるようになり、commitTask を実行して上記のディレクトリを
${target_dir}
に移動します。ここでは正式なディレクトリに直接移動することに注意してください。 - すべてのタスクが完了したら、すべてのデータが正式なディレクトリに保存されているため、commitJob は単に識別子
_SUCCESS
を追加して提出成功を示します。
データの一貫性問題
- taskAttempt の書き込み段階で、タスクの書き込みが失敗して再試行が必要な場合、この時点でタスクデータが正式なディレクトリに移動されている可能性があるため、汚れたデータが発生します。
- アプリケーションの再試行が発生した場合、以前のアプリケーションが提出したデータはすでに正式なディレクトリに存在するため、追加の rename 操作は不要で、他のデータの再試行を直接続行できます。ただし、この時点で提出されたデータが完全に正しいとは限らず、汚れたデータが存在する可能性があります。
- V2 プロセスは一定のデータの一貫性を犠牲にし、最終的一貫性の方案を選択しました。中間過程でデータの正確性を保証するものが欠如しているため、データが正しいかどうかを判断するためには最後の
_SUCCESS
識別子に依存する必要があります。また、汚れたデータが存在するため、タスクが長期間実行される中で、これらの汚れたデータが正しくクリーンアップされない可能性があり、ストレージスペースを占有し、追加のコストをもたらすことになります。
パフォーマンス問題
V2 が最終的一貫性の方案を採用する理由は、ディレクトリが V1 の rename 操作による過剰な所要時間のオーバーヘッドを減少させるためです。V1 と比較して、V2 はタスク完了後に正式なディレクトリに rename するだけで済み、タスクスレッドの並行操作によって実行時間が大幅に短縮されます。
小ファイルの最適化#
上述の committer の異なるアルゴリズムは、一貫性とパフォーマンスの選択肢を提供しますが、結局のところそれぞれに利点と欠点があります。しかし、実際のシーンでは、皆が「私は全てを手に入れたい」と望むことが多いです。
rename 段階での最適化に加えて、パフォーマンスの殺し屋の源である大量の小ファイルの最適化も効果的な方法となっています。
Spark の既存の最適化:#
spark には小ファイルの最適化が内蔵されており、ファイル生成の観点から:
spark.files.openCostInBytes
このパラメータを利用してファイルを開く際のサイズを予測し、高めに設定することで小ファイルのパーティション速度を向上させることができます。
ビジネス側の観点からは、大まかな考え方はパーティション数を減らして小ファイルを大ファイルに統合することです。
- coalesce または repartition 操作を使用してパーティションを統合します。
- 動的パーティションの使用を減らすか、
distribute by
を使用して reducer の数を制御します。 - パーティションテーブルを多く使用して、クエリ時に生成されるパーティション数を減少させます。
- より先進的なファイル圧縮形式を使用して小ファイルの処理性能を向上させます。
AWS の特別な最適化:#
私たちが生産環境で AWS の EMR を使用しているため、AWS チームが s3 上で小ファイルの最適化に関して取っている措置についても少し理解しています。
- マルチアップロード:その原理は、ファイルの断片を並行して読み書きすることで s3 の読み書き性能を向上させることです。これに基づいて、EMRFS S3-optimized Committer と s3a Committer(オープンソース)が派生しました。この committer はデフォルトで FileOutputCommitter V2 方式を採用しているため、V2 の問題はこれらの committer にも存在します。
- hdfs を利用した加速:EMR では、ファイルシステムが rename などの操作に対してより良い性能を持つことを考慮して、ファイルシステム上で先に rename し、その後 s3 に提出することができるのではないでしょうか?EMR では、ファイルが s3 に提出される前に、hdfs のようなファイルシステムに先にアップロードして rename またはファイル結合操作を行った後に s3 にアップロードします。これにより、純粋な s3 の読み書きに比べて、性能上の明らかな利益があります。ただし、欠点は、単独でファイルシステムを維持することが高コストであることです。
その他の最適化思考:#
私たちのチームも小ファイルの統合に関して最適化を行っており、その最適化の考え方はジョブ実行の最後に小ファイルを統合するための新しいジョブを作成することです。Spark のSqlHadoopMapReduceCommitProtocol
を継承してプラグイン式の拡張を実現します。
統合の考え方は、commitTask の後にデータのパーティション情報を取得し、その後分組して統合し、最後に commitJob の際に統合されたファイルを正式なディレクトリに移動することです。その基本的な考え方は以下の図のようになります。
このように小ファイルを統合する利点は
-
この機能はプラグイン式で、原生コードへの侵入性が低いこと
-
大量の小ファイルのシーンでの優位性が明らかであること
欠点
- 新たにジョブを起動して最適化を行うため、タスクの最後に小ファイル統合のための 2 つの段階が追加され、より多くのタスクが導入され、一定の所要時間がかかります。
終わりに#
この機能を起動することで、タスクを再度実行したところ、最終的な所要時間が明らかに短縮されました。
もちろん、最適化は完全には終了していません。eks 上のタスクの所要時間は全体的に元の EMR タスクよりも高いですが、この問題の深掘りは、次回の時間があるときに共有します。