velox とは何か#
velox は C++ で書かれたオープンソースのデータベース実行加速ツールライブラリです。その利点は、ネイティブメソッドを利用して計算を最適化し、ベクトル化技術などのシステムレベルの最適化手段を深く活用して実行計画を加速することです。したがって、ネイティブコンピュートエンジンとも呼ばれます。velox を従来のデータ計算エンジンに統合することで、多くの一般的なシナリオで計算性能を大幅に向上させ、クエリ効率を高めることができますが、実際の運用に導入する際には一定の問題が残ります。gluten のようなオープンソースプロジェクトを利用すれば、velox を迅速に spark に統合し、従来のビッグデータ計算アーキテクチャへの適応プロセスを簡素化できますが、依然として適応コストが発生します。また、最適化はすべてのクエリ条件を 100% カバーするわけではないため、自分の使用シナリオに対して十分な性能検証とテスト評価を行った後に使用を決定することをお勧めします。
列指向シャッフル#
私の使用シナリオでは、主に velox を spark の計算プロセスに組み合わせていますが、その中で大きな適応ポイントの一つがシャッフルです。velox のデータ構造は列指向であり、従来の行単位でシャッフルデータを処理する spark とは大きな違いがあります。ネイティブシャッフルを使用すると、行列変換のオーバーヘッドが発生します。RSS(Remote Shuffle Service)を使用すると、性能の差はさらに大きくなり、大規模なシャッフルタスクのシナリオではほぼ受け入れられません。したがって、こうした問題を解決するためにネイティブの列指向シャッフルが必要です。
オープンソースプロジェクトの gluten では、velox を spark により良く統合するために多くの最適化が行われており、ネイティブの列指向シャッフルが実装されています。また、apache celeborn を接続して RSS への適応も実現しています。以下では、gluten がこれらの 2 つのシナリオでの列指向シャッフルの実装について主に紹介します。
ローカルシャッフル#
私たちがシャッフルに注目するのは、主にシャッフルのライターとリーダーに関心があるからです。特にライターは改造の重点であり、上流で準備された map データの処理方法に関わります。
ColumnarShuffleWriter#
列指向シャッフルを実現する方法は、本質的には行列変換をどのように解決するかです。したがって、上流の列指向データを加工処理し、シャッフルプロセスを行う必要があります。
velox の列指向データ構造は ColumnarBatch であり、すべての処理待ちのデータを含んでいます。その構造は以下の図に似ています:
このデータ構造を処理する際には、以下のいくつかの問題を考慮する必要があります:
- 行データと列データをどのように対応させるか?
- 大量のデータに直面した場合、どのように保存するか(OOM を防ぐため)
- パフォーマンスをどのように保証するか?
最初の問題は実際には最も基本的な問題です。spark には合計 3 種類の ShuffleWriter があり、gluten の列指向シャッフルライターである ColumnarShuffleWriter は主に HashBasedShuffleWriter を基に改造されています。行列関係のマッピングも主にハッシュマッピングに基づいています。このようにすることで、実装が簡単になり、ソートを避けて大量のランダムな読み書きがパフォーマンスに影響を与えるのを防ぎますが、HashBasedShuffleWriter は大量のファイルも生成します。また、メモリのオーバーヘッドも問題です。しかし、gluten はこのライターに SortBasedShuffleWriter の設計を取り入れており、この点は後のプロセスで感じることができます。
2 つ目の問題について、gluten はメモリ内で ColumnBatch を分割し、毎回一部のデータのみを処理します。これにより、メモリ内でより多くのデータを処理できるだけでなく、RSS のシナリオでネットワーク伝送による損失を低減できます。
3 つ目の問題について、gluten は多くの詳細で最適化を行っています。例えば、arrow を利用してメモリを管理し、メモリの再利用を行うことで OOM のリスクを低減し、spill の確率を減少させます。CPU キャッシュを十分に活用し、ランダム書き込みを避けるなどです。
以下に ColumnarShuffleWriter の全体的なプロセス設計を示します:
注意:ここでのほとんどすべてのプロセスは velox 内で実装されています。
- 各レコードを columnBatch にマッピングする
- パーティショナーに基づいて、パーティションと行の対応関係を計算する
- マッピングテーブルを構築し、partition id -> rowid のマッピング関係を完成させる
- データを複数回ロードするためのメモリを事前に割り当てる
- 複数の split function を呼び出し、データを分割してキャッシュに格納する
- メモリが不足した場合、データをファイルに spill する
- 最後に write を完了し、メモリデータと spill ファイルをマージして最終ファイルを形成する
partition2Row の関係を構築する#
ここで重要なのは、これらの 2 つの配列を構築することです:partition2RowOffset と rowOffset2RowId
ここには、パーティション、列、および行の間のマッピング関係が保存され、有効なパーティションを特定します(パーティションにデータが来ない場合、後でメモリを事前に割り当てることはありません)。
split function#
split フェーズでは、split 関数を遍歴し、ColumnarBatch から生成された rowVector をパーティションに基づいて切り出し、各パーティションに事前に割り当てられたメモリに格納します。メモリが溢れる条件に達した場合、メモリ内のデータを spill します。
rowVector のフォーマット例は以下の通りです:
- split function は主に 4 種類の関数を含み、それぞれ異なるタイプの列を処理します。
- splitFixedWidthValueBuffer は固定幅の列を切り分けます。一般的には列の型(例えば int、boolean)です。
- splitValidityBuffer は有効なバイト値を切り分け、null 値を処理します。
- splitBinaryArray はバイナリ配列のデータを切り分けます。
- splitComplexType は複雑な型(struct、map、list)を切り分けます。
切り分ける前に、まずメモリバッファ:preAllocPartitionBuffer を初期化し、切り分けたデータがメモリに完全に収まるようにします。切り分けたデータは複数回遍歴されるため、実際のシナリオでは、各回の切り分けデータのサイズを可能な限り CPUL1/L2 キャッシュに収まるように制御する必要があります。これにより、比較的良好なパフォーマンスを達成できます。
preAllocBuffer#
gluten は arrow を基にメモリ管理とデータ交換を実現しています。
前述の切り分けたデータに基づき、各パーティションは切り分けデータをキャッシュするために再利用可能で、サイズ変更可能なメモリを事前に割り当てます。
事前に割り当てるサイズの計算式:
# 事前割り当てサイズ
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
# キャッシュ内に最大で格納できる行数(numRows)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4
buffe は動的にサイズ変更可能なメモリで、arrow を利用して精密な制御を行っています。通常、サイズ変更のトリガー条件(実際のメモリサイズ変更の状況はもっと多様です)は、
# デフォルトのサイズ変更因子THRESHOLDは0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)
その完全なメモリサイズ変更の状況は以下の図の通りです:
ここでevict and free data
はメモリ内のデータを spill し、spill されるデータは arrow を通じてコピー出力され、初期サイズは 4096 で、不足する場合は自動的に拡大します。
spill and merge#
ColumnarShuffleWriter の設計は hashBased shuffle ですが、gluten はその基盤に unsafe sort の設計思想を取り入れています。
ローカル spill のシナリオでは、メモリが閾値に達するか OOM になるたびに、メモリ内のデータを圧縮し、複数回ディスクファイルに溢れ書きします。複数回の columnarBatch 処理を経て、最終的にすべての溢れ書きファイルを 1 つのファイルに統合します。
- 各 spill ファイル内のデータはパーティション ID の順序で書き込まれます。
- 統合時には、パーティション ID に基づいてすべてのファイルを遍歴し、各パーティションのデータを統合します。
ColumnarShuffleReader#
リーダー側の設計は比較的シンプルで、基本的にネイティブの ShuffleReader を再利用できます。map 側から取得するデータは基本的に変わらず、主に取得したデータを ColumnarBatch に変換して下流で使用できるようにする必要があります。ここでは、デシリアライザーを再実装するだけで実現できます。
リモートシャッフル#
リモート列指向シャッフルの設計思想はローカルシャッフルと基本的に同じですが、主な問題はデータを正しく RSS にプッシュする方法です。
gluten が celeborn に適応する設計では、ライターとリーダーを再実装しました。その基本的な考え方はローカルシャッフルと似ており、ネイティブエンジンを通じて columnarBatch の切り分けとプッシュを実現し、リーダー側ではデシリアライザーを実装して columnarBatch を取得します。
- 各レコードを columnBatch にマッピングする
- パーティショナーに基づいて、パーティションと行の対応関係を計算する
- マッピングテーブルを構築し、partition id -> rowid のマッピング関係を完成させる
- データを複数回ロードするためのメモリを事前に割り当てる
- 複数の split function を呼び出し、データを分割してキャッシュに格納する
- キャッシュ上限を超えた場合、celeborn に溢れ書きする
前述の基本的なポイントはローカルシャッフルの段階で既に紹介されており、データをプッシュする際は JNI を通じて celeborn クライアントをネイティブエンジンに渡し、溢れ書きされたデータを celeborn にプッシュし、その後 celeborn 上でデータの統合を完了します。