What is Velox#
Velox is an open-source database execution acceleration library written in C++. Its advantage lies in utilizing native methods to optimize computations and deeply leveraging system-level optimization techniques such as vectorization to accelerate execution plans. Therefore, it can also be referred to as a native compute engine. Integrating Velox into traditional data computation engines can significantly enhance computational performance and improve query efficiency in many general scenarios; however, introducing it into production still poses certain challenges. Although there are open-source projects like Gluten that can quickly integrate Velox into Spark, thereby simplifying the adaptation process to traditional big data computation architectures, there will still be some adaptation costs; optimization does not cover all query conditions 100%, so it is recommended to conduct extensive performance validation and testing evaluations based on your usage scenarios before deciding whether to use it.
Columnar Shuffle#
In my usage scenario, the main focus is on integrating Velox into the Spark computation process, with one significant adaptation point being shuffle. Since the data structure in Velox is columnar, this differs considerably from the traditional row-based processing of shuffle data in Spark. Using native shuffle incurs an overhead of row-column conversion. If using RSS (Remote Shuffle Service), the performance gap further increases, which is almost unacceptable in large shuffle task scenarios. Therefore, we need a native columnar shuffle to address such issues.
In the open-source project Gluten, many optimizations have been made for better integration of Velox into Spark, including the implementation of native columnar shuffle. It has also integrated Apache Celeborn to adapt to RSS. Below, I will mainly introduce the implementation of columnar shuffle in Gluten in these two scenarios.
Local Shuffle#
Our focus on shuffle primarily revolves around the shuffle writer and reader, with the writer being the key area of modification, as it involves how to handle the upstream prepared map data.
ColumnarShuffleWriter#
The essence of implementing columnar shuffle is how to solve row-column conversion. Therefore, we need to process the upstream columnar data before proceeding with the shuffle process. The columnar data structure in Velox is ColumnarBatch, which contains all the data waiting to be processed. Its structure is similar to the following image:
Processing this data structure requires consideration of several issues:
- How to correspond row data with column data?
- How to store large data volumes (to prevent OOM)?
- How to ensure performance?
The first question is actually the most fundamental one. We know that there are three types of ShuffleWriter in Spark, and Gluten's columnar shuffle writer: ColumnarShuffleWriter is primarily based on HashBasedShuffleWriter. The mapping of row-column relationships is also mainly based on hash mapping. The benefit of this approach is its simplicity, while avoiding sorting to prevent a large number of random reads and writes from affecting performance; however, HashBasedShuffleWriter can also generate a large number of files. Additionally, memory overhead is a concern. However, Gluten has also incorporated the design of SortBasedShuffleWriter into this writer, which we can experience in the subsequent processes.
For the second question, Gluten will split ColumnBatch in memory, processing only a portion of the data at a time, which not only allows for more data to be processed in memory but also reduces the network transmission losses in RSS scenarios.
For the third question, Gluten has optimized many details. For example, it utilizes Arrow to manage memory, allowing for memory reuse, thereby reducing the risk of OOM and decreasing the probability of spills; it fully utilizes CPU caches; avoids random writes, etc.
Below is the overall process design of ColumnarShuffleWriter:
Note that almost all processes here are implemented in Velox.
- Obtain each record mapped to columnBatch
- Calculate the correspondence between partitions and rows based on the partitioner
- Construct a mapping table to complete the mapping relationship from partition id to rowid
- Pre-allocate memory for multiple data loads
- Call several split functions to split data into the cache
- If memory is insufficient, spill data to files
- Finally, complete the write, merging memory data and spill files to form the final file
Constructing the partition2Row Relationship#
A key aspect is constructing these two arrays: partition2RowOffset and rowOffset2RowId. These store the mapping relationships between partition, column, and row, and determine valid partitions (if no data comes into a partition, memory will not be pre-allocated for it).
Split Function#
The split phase will traverse the split functions, responsible for slicing the rowVector converted from ColumnarBatch according to partitions and placing it into the pre-allocated memory for each partition. When memory reaches the overflow requirement, the data in memory will be spilled out.
The format of rowVector is illustrated in the image below:
- The split function mainly includes four types of functions, each handling different types of columns:
- splitFixedWidthValueBuffer splits fixed-width columns, generally for column types (e.g., int, boolean)
- splitValidityBuffer splits valid byte values, handling null values
- splitBinaryArray splits binary array data
- splitComplexType splits complex types (struct, map, list)
Before splitting, the memory buffer is initialized: preAllocPartitionBuffer, ensuring that the split data can fit completely into memory. Since the split data will be traversed multiple times, in actual scenarios, the size of the data split each time should be controlled to fit into CPU L1/L2 cache as much as possible, achieving better performance.
PreAllocBuffer#
Gluten implements memory management and data exchange based on Arrow.
Based on the previously split data, each partition will pre-allocate a reusable, resizable memory block for caching the split data.
The pre-allocation size calculation formula is:
# Pre-allocation size
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
# Maximum number of rows that can be stored in the cache (newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4
Buffer is a dynamically resizable memory, with fine-grained control using Arrow: the typical conditions that trigger resizing (actual memory resizing situations will be more) are:
# Default resizing factor THRESHOLD is 0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)
The complete memory resizing situation is illustrated in the image below:
Where evict and free data
will spill the data from memory; the data spilled each time will be copied out via Arrow, with an initial size of 4096, which will expand automatically if insufficient.
Spill and Merge#
Although the design of ColumnarShuffleWriter is hash-based shuffle, Gluten incorporates some unsafe sort design ideas. In the local spill scenario, each time memory usage reaches the threshold or OOM, the data in memory will be compressed and spilled multiple times to disk files; after several rounds of columnarBatch processing, all spilled files will be merged into one file.
- Data in each spill file is written in the order of partition id.
- During merging, all files are traversed according to partition id to achieve the merging of data for each partition.
ColumnarShuffleReader#
The design of the reader side is relatively simple; it can basically reuse the native ShuffleReader since the data pulled from the map side has not changed significantly. It mainly needs to convert the pulled data into ColumnarBatch for downstream use. This can be achieved by simply rewriting the deserializer.
Remote Shuffle#
The design idea of remote columnar shuffle is fundamentally similar to local shuffle, with the main issue being how to correctly push data to RSS. In Gluten's adaptation of Celeborn, the writer and reader have been re-implemented. The basic idea is similar to local shuffle, using the native engine to implement the splitting and pushing of columnarBatch, and on the reader side, obtaining columnarBatch by implementing the deserializer.
- Obtain each record mapped to columnBatch
- Calculate the correspondence between partitions and rows based on the partitioner
- Construct a mapping table to complete the mapping relationship from partition id to rowid
- Pre-allocate memory for multiple data loads
- Call several split functions to split data into the cache
- Spill to Celeborn when exceeding the cache limit
The basic points mentioned earlier have been introduced in the Local Shuffle section; pushing data is done through JNI, passing the Celeborn client to the native engine, which is responsible for pushing the spilled data to Celeborn, and then completing the data merge on Celeborn.