什麼是 velox#
velox 是一個基於 C++ 編寫的開源數據庫執行加速工具庫。它的優勢是利用 native 方法來優化計算,深度利用系統級的優化手段例如向量化技術來加速執行計劃。因此,也可以將其稱為是一個 native compute engine。將 velox 集成到傳統的數據計算引擎中在不少通用場景中可以顯著提升計算性能,提高查詢效率,不過將其引入生產依然還會有一定的問題。儘管有類似 gluten 這樣的開源項目可以快速將 velox 集成進 spark,從而簡化適配傳統大數據計算架構的流程,但依然會存在一些適配成本;優化也並非百分百覆蓋所有查詢條件,因此建議針對自己的使用場景經過較多性能驗證和測試評估後再決定是否使用。
列式 shuffle#
在我的使用場景中,主要是將 velox 結合到 spark 的計算流程中,其中一個比較大的適配點是 shuffle。因為在 velox 中的數據結構是列式的,這和傳統以行為單位處理 shuffle 數據的 spark 有著比較大的區別。如果使用原生 shuffle,會存在一個行列轉換的開銷。如果使用 RSS(Remote Shuffle Service),性能的損耗會進一步增大,這在大 shuffle 的任務場景就幾乎不可接受。因此我們需要一個原生的列式 shuffle 來解決這類問題。
在開源項目中 gluten 中,為 velox 更好的集成到 spark 做了很多優化,包括實現了原生的列式 shuffle。同時也接入了 apache celeborn 實現了對 RSS 的適配。下面就主要介紹一下 gluten 在這兩個場景下的列式 shuffle 的實現。
Local Shuffle#
我們關注 shuffle 主要就是關注 shuffle 的 writer 和 reader,其中 writer 是改造的重點,因為這涉及到如何處理上游準備 map 的數據。
ColumnarShuffleWriter#
如何實現列式 shuffle,本質上是如何解決行列轉換。因此我們需要對上游的列式數據進行加工處理,再進行 shuffle 流程。
velox 的列式數據結構為 ColumnarBatch,其中包含了所有等待處理的數據。其結構類似下圖:
對該數據結構處理需要考慮這幾個問題:
- 如何將行數據和列數據對應起來?
- 遇到大數據量如何存儲(防止 OOM)
- 如何保證性能?
第一個問題其實也是最基礎的問題。我們知道在 spark 中總共有三種 ShuffleWriter,gluten 的列式 shuffle writer:ColumnarShuffleWriter 主要是基於 HashBasedShuffleWriter 改造。對於行列關係的映射也主要是基於 hash 映射。這樣做的好處是實現簡單,同時避免 sort 防止大量隨機讀寫影響性能,但是 HashBasedShuffleWriter 也會產生大量文件。另外對內存的開銷也是一個問題。不過 gluten 也在這個 writer 上融入了 SortBasedShuffleWriter 的設計,這點我們在後面的流程中可以感受到。
第二個問題,gluten 會在內存中對 ColumnBatch 切分,每次只處理一部分的數據,這樣不僅可以在內存中處理更多的數據,也可以在 RSS 的場景中降低網絡傳輸帶來的損耗。
第三個問題,gluten 在很多細節進行了優化。例如,利用 arrow 來管理內存,對內存進行復用,從而降低 OOM 的風險,減少 spill 的概率;充分利用 CPU 緩存;避免隨機寫等等。
下面展示 ColumnarShuffleWriter 的整體流程設計:
注意,這裡基本所有的流程都在 velox 中實現。
- 獲取每個 record 映射成 columnBatch
- 根據分區器,計算分區和 row 的對應關係
- 構建映射表,完成 partition id -> rowid 的映射關係
- 預分配內存用於多次裝載數據
- 調用若干 split function,切分數據裝進緩存
- 如果內存不足,將數據 spill 到文件
- 最後完成 write,merge 內存數據和 spill 文件,形成 final file
構建 partition2Row 的關係#
其中比較重要的是構建了這兩個數組:partition2RowOffset 和 rowOffset2RowId
這裡保存了 partition、column 和 row 之間的映射關係,並確定有效的 partition(如果 partition 沒有數據進來,後面就不會給預分配內存)
split function#
split 階段會遍歷 split 函數,負責將 ColumnarBatch 轉成的 rowVector 根據 partition 切出來放進每個
partition 預分配好的內存,當內存達到溢出要求後,會將內存中的數據 spill 出來。
rowVector 的格式樣例如圖
- split function 主要包含四種函數,分別處理不同類型的 column
- splitFixedWidthValueBuffer 將固定位寬的列切分,一般是 column type(例如 int,boolean)
- splitValidityBuffer 將有效的字節值切分,處理 null 值
- splitBinaryArray 將二進制隊列的數據切分
- splitComplexType 將複雜類型(struct, map, list)切分
切分前會首先初始化內存 Buffer:preAllocPartitionBuffer,確保切分後的數據能完整裝進內存
因為切分的數據會被多次遍歷,因此在實際場景應當控制每次切分的數據大小盡可能裝入 CPUL1/L2 緩存,這樣可以達到一個比較好的性能
preAllocBuffer#
gluten 基於 arrow 來實現內存管理和數據交換。
基於前面切分後的數據,每個 partition 會預分配一塊可重用、可 resize 的內存用於緩存切分數據
預分配的大小計算公式:
#預分配大小
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
#緩存中最大可以存放的rows num(newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4
buffer 是一塊動態伸縮的內存,利用 arrow 做了精細化的控制:通常觸發伸縮的條件(實際內存伸縮的情況會更多)是、
#默認伸縮因子THRESHOLD是0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)
其完整的內存伸縮情況如下圖:
其中evict and free data
就會把內存的數據 spill 出去,每次 spill 的數據會通過 arrow 拷貝輸出,初始大小是 4096,如果不夠會自行擴大
spill and merge#
ColumnarShuffleWriter 的設計雖然是 hashBased shuffle,但是 gluten 在基礎上融入了一些 unsafe sort 的設計思路。
在本地 spill 的場景中,每次當使用內存達到閾值或者 OOM 時,會將內存中的數據壓縮並多次溢寫到
磁碟文件中;當經過多輪 columnarBatch 處理,最後會把所有溢寫文件合併成一個文件
- 每個 spill 文件中數據按照 partition id 的順序寫入
- 合併時會按照 partition id 遍歷所有文件,實現每個 partition 數據的合併
ColumnarShuffleReader#
reader 端的設計其實比較簡單,基本可以復用原生的 ShuffleReader,因為從 map 端拉取的數據基本沒變,主要需要將拉取的數據轉換成 ColumnarBatch 供下游繼續使用。這裡只需要重寫反序列化器即可實現。
Remote Shuffle#
remote columnar shuffle 的設計思路和本地 shuffle 基本相同,主要問題在於如何將數據正確推送到 RSS。
在 gluten 適配 celeborn 的設計中,重新實現了 writer 和 reader。其基本思路和本地 shuffle 類似,通過 native engine 來實現 columnarBatch 的切分和 push,在 reader 端通過實現反序列化器來獲取 columnBatch。
- 獲取每個 record 映射成 columnBatch
- 根據分區器,計算分區和 row 的對應關係
- 構建映射表,完成 partition id -> rowid 的映射關係
- 預分配內存用於多次裝載數據
- 調用若干 split function,切分數據裝進緩存
- 超過緩存上限溢寫到 celeborn
前面基本幾點在 Local Shuffle 環節均已介紹,推送數據則是通過 JNI 將 celeborn client 傳遞到 native engine,負責將溢寫的數據 push 到 celeborn,然後在 celeborn 上完成數據的 merge。