ベロックスとは何ですか#
ベロックスは、C++ で書かれたオープンソースのデータベース実行アクセラレーションツールライブラリです。その利点は、ネイティブメソッドを使用して計算を最適化し、ベクトル化技術などのシステムレベルの最適化手法を深く利用することです。したがって、それはネイティブコンピュートエンジンとも呼ばれることができます。ベロックスを従来のデータ計算エンジンに統合することで、一般的なシナリオでは計算性能を大幅に向上させ、クエリの効率を高めることができますが、それを本番環境に導入するとまだいくつかの適応コストが発生する可能性があります。ベロックスをスパークに迅速に統合するための gluten などのオープンソースプロジェクトがありますが、それでもいくつかの適応コストが発生します。また、最適化はすべてのクエリ条件を 100%カバーするわけではないため、使用するかどうかを決定する前に、自分の使用シナリオに対して多くのパフォーマンス検証とテスト評価を行うことをお勧めします。
列指向シャッフル#
私の使用シナリオでは、主にベロックスをスパークの計算フローに組み込んでいますが、その中でも大きな適応ポイントはシャッフルです。なぜなら、ベロックスではデータ構造が列指向であり、これは従来の行単位でシャッフルデータを処理するスパークとは大きく異なるからです。ネイティブのシャッフルを使用すると、行列の変換のオーバーヘッドが発生します。RSS(リモートシャッフルサービス)を使用すると、パフォーマンスの低下がさらに増加し、大規模なシャッフルのタスクシナリオではほぼ受け入れられません。したがって、このような問題を解決するために、ネイティブの列指向シャッフルが必要です。
オープンソースプロジェクトである gluten では、ベロックスをスパークに統合するために多くの最適化が行われており、列指向シャッフルも実装されています。また、apache celeborn にも統合されており、RSS に対応しています。以下では、gluten がこれらの 2 つのシナリオで列指向シャッフルをどのように実装しているかについて説明します。
ローカルシャッフル#
私たちはシャッフルに関しては、主にシャッフルのライターとリーダーに注目しています。そのうちのライターは改造が重要です。なぜなら、これは上流のマップのデータをどのように処理するかに関わるからです。
ColumnarShuffleWriter#
列指向のシャッフルを実現する方法は、本質的には行列の変換をどのように解決するかということです。したがって、上流の列指向データを加工して処理し、その後シャッフルプロセスを行う必要があります。
ベロックスの列指向データ構造は ColumnarBatch であり、すべての処理待ちデータが含まれています。その構造は次の図のようになっています。
このデータ構造を処理するには、次のいくつかの問題を考慮する必要があります:
- 行データと列データをどのように対応付けるか?
- 大量のデータに対処する方法(OOM を防ぐ)
- パフォーマンスを確保する方法?
最初の問題は、基本的な問題でもあります。私たちは、スパークには 3 種類のシャッフルライターがあることを知っていますが、gluten の列指向シャッフルライターである ColumnarShuffleWriter は、HashBasedShuffleWriter を改造したものです。行列の関係のマッピングも主にハッシュマッピングに基づいています。これにより、実装が簡単になり、同時に大量のランダムな読み書きが性能に影響を与えることを避けることができますが、HashBasedShuffleWriter は大量のファイルを生成することもあります。また、メモリのオーバーヘッドも問題です。ただし、gluten はこのライターに SortBasedShuffleWriter の設計も組み込んでおり、後のプロセスでそれを感じることができます。
2 番目の問題では、gluten は ColumnBatch をメモリ内で分割し、一度に処理するデータの一部のみを処理することで、メモリ内でより多くのデータを処理できるだけでなく、ネットワーク転送によるパフォーマンスの低下を軽減することができます。
3 番目の問題では、gluten は多くの最適化を行っています。たとえば、メモリを管理するために arrow を使用し、メモリを再利用することで OOM のリスクを低減し、スピルの確率を減らすことができます。CPU キャッシュを最大限に活用し、ランダムな書き込みを回避するなどです。
以下に ColumnarShuffleWriter の全体的なフロー設計を示します:
注意:ここでのほとんどのフローは velox で実装されています。
- 各レコードを取得し、columnBatch にマッピングします。
- パーティショナーに基づいて、パーティションと行の対応関係を計算します。
- マッピングテーブルを構築し、パーティション ID -> 行 ID のマッピング関係を完成させます。
- データを複数回ロードするためにメモリを事前に割り当てます。
- 複数の分割関数を呼び出して、データを分割し、キャッシュにロードします。
- メモリが不足している場合は、データをファイルにスピルします。
- 最後に、書き込みを完了し、メモリデータとスピルファイルをマージして最終ファイルを作成します。
パーティション 2 行の関係を構築する#
重要なのは、partition2RowOffset と rowOffset2RowId という 2 つの配列を構築することです。これにより、パーティション、列、行の間のマッピング関係が保存され、有効なパーティション(パーティションにデータが入っていない場合、後続のメモリ割り当ては行われません)が決定されます。
分割関数#
分割の段階では、分割関数を複数回呼び出し、ColumnarBatch から作成された rowVector をパーティションごとに切り出して、各パーティションの事前に割り当てられたメモリにロードします。メモリが溢れる要件を満たすと、メモリ内のデータがスピルされます。
rowVector の形式の例は次の図のようになります。
- 分割関数には、異なるタイプの列を処理する 4 つの関数が含まれています。
- splitFixedWidthValueBuffer は、固定幅の列を分割します。通常は列のタイプ(例:int、boolean)です。
- splitValidityBuffer は、有効なバイト値を分割し、null 値を処理します。
- splitBinaryArray は、バイナリ配列のデータを分割します。
- splitComplexType は、複雑なタイプ(struct、map、list)を分割します。
分割前には、メモリバッファ preAllocPartitionBuffer を初期化する必要があります。これにより、分割後のデータがメモリに完全に収まるようになります。分割されたデータは複数回トラバースされるため、実際のシナリオでは、各分割のデータサイズを制御して、CPU L1/L2 キャッシュにできるだけ収まるようにする必要があります。これにより、パフォーマンスが向上します。
preAllocBuffer#
gluten は、メモリ管理とデータ交換に arrow を使用しています。
分割後のデータに基づいて、各パーティションには再利用可能でリサイズ可能なバッファが事前に割り当てられ、分割データをキャッシュするために使用されます。
割り当てられたサイズの計算式は次のとおりです:
#割り当てサイズ
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
#キャッシュに格納できる最大の行数(newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4
バッファは、arrow を使用して細かく制御されています。通常、メモリの拡張がトリガーされる条件(実際のメモリの拡張状況はさらに多様です)は次のとおりです。
#デフォルトの拡張係数THRESHOLDは0.25です
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)
完全なメモリの拡張状況は次の図のようになります。
evict and free data
では、データがメモリからスピルされ、スピルされたデータは arrow を介してコピーされます。初期サイズは 4096 であり、不足している場合は自動的に拡大されます。
スピルとマージ#
ColumnarShuffleWriter の設計は、HashBased シャッフルではありますが、gluten はいくつかの unsafe sort の設計思想も取り入れています。ローカルスピルのシナリオでは、メモリがしきい値に達するか OOM が発生するたびに、データが圧縮され、複数回ディスクファイルにスピルされます。複数の ColumnarBatch の処理を経て、最後にすべてのスピルファイルをマージして 1 つのファイルに結合します。
- 各スピルファイルのデータは、パーティション ID の順序で書き込まれます。
- マージ時には、パーティション ID を順にすべてのファイルでトラバースし、各パーティションのデータをマージします。
ColumnarShuffleReader#
リーダー側の設計は実際には非常にシンプルであり、基本的にはネイティブの ShuffleReader を再利用することができます。なぜなら、マップ側から取得するデータはほとんど変わらず、主に取得したデータを ColumnarBatch に変換して、下流で使用できるようにする必要があるからです。ここでは、デシリアライザを上書きするだけで実現できます。