banner
bladedragon

bladedragon

基于velox的列式shuffle介绍

什么是 velox#

velox 是一个基于 C++ 编写的开源数据库执行加速工具库。它的优势是利用 native 方法来优化计算,深度利用系统级的优化手段例如向量化技术来加速执行计划。因此,也可以将其称为是一个 native compute engine。将 velox 集成到传统的数据计算引擎中在不少通用场景中可以显著提升计算性能,提高查询效率,不过将其引入生产依然还会有一定的问题。尽管有类似 gluten 这样的开源项目可以快速将 velox 集成进 spark,从而简化适配传统大数据计算架构的流程,但依然会存在一些适配成本;优化也并非百分百覆盖所有查询条件,因此建议针对自己的使用场景经过较多性能验证和测试评估后再决定是否使用。

列式 shuffle#

在我的使用场景中,主要是将 velox 结合到 spark 的计算流程中,其中一个比较大的适配点是 shuffle。因为在 velox 中的数据结构是列式的,这和传统以行为单位处理 shuffle 数据的 spark 有着比较大的区别。如果使用原生 shuffle,会存在一个行列转换的开销。如果使用 RSS(Remote Shuffle Service),性能的损耗会进一步增大,这在大 shuffle 的任务场景就几乎不可接受。因此我们需要一个原生的列式 shuffle 来解决这类问题。

在开源项目中 gluten 中,为 velox 更好的集成到 spark 做了很多优化,包括实现了原生的列式 shuffle。同时也接入了 apache celeborn 实现了对 RSS 的适配。下面就主要介绍一下 gluten 在这两个场景下的列式 shuffle 的实现。

Local Shuffle#

我们关注 shuffle 主要就是关注 shuffle 的 writer 和 reader,其中 writer 是改造的重点,因为这涉及到如何处理上游准备 map 的数据。

ColumnarShuffleWriter#

如何实现列式 shuffle,本质上是如何解决行列转换。因此我们需要对上游的列式数据进行加工处理,再进行 shuffle 流程。
velox 的列式数据结构为 ColumnarBatch ,其中包含了所有等待处理的数据。其结构类似下图:

image

对该数据结构处理需要考虑这几个问题:

  1. 如何将行数据和列数据对应起来?
  2. 遇到大数据量如何存储(防止 OOM)
  3. 如何保证性能?

第一个问题其实也是最基础的问题。我们知道在 spark 中总共有三种 ShuffleWriter,gluten 的列式 shuffle writer:ColumnarShuffleWriter 主要是基于 HashBasedShuffleWriter 改造。对于行列关系的映射也主要是基于 hash 映射。这样做的好处是实现简单,同时避免 sort 防止大量随机读写影响性能,但是 HashBasedShuffleWriter 也会产生大量文件。另外对内存的开销也是一个问题。不过 gluten 也在这个 writer 上融入了 SortBasedShuffleWriter 的设计,这点我们在后面的流程中可以感受到。

第二个问题,gluten 会在内存中对 ColumnBatch 切分,每次只处理一部分的数据,这样不仅可以在内存中处理更多的数据,也可以在 RSS 的场景中降低网络传输带来的损耗。

第三个问题,gluten 在很多细节进行了优化。例如,利用 arrow 来管理内存,对内存进行复用,从而降低 OOM 的风险,减少 spill 的概率;充分利用 CPU 缓存;避免随机写等等。

下面展示 ColumnarShuffleWriter 的整体流程设计:

列式 shuffle

注意,这里基本所有的流程都在 velox 中实现。

  1. 获取每个 record 映射成 columnBatch
  2. 根据分区器,计算分区和 row 的对应关系
  3. 构建映射表,完成 partition id -> rowid 的映射关系
  4. 预分配内存用于多次装载数据
  5. 调用若干 split function, 切分数据装进缓存
  6. 如果内存不足,将数据 spill 到文件
  7. 最后完成 write,merge 内存数据和 spill 文件,形成 final file

构建 partition2Row 的关系#

其中比较重要的是构建了这两个数组: partition2RowOffset 和 rowOffset2RowId
这里保存了 partition、column 和 row 之间的映射关系,并确定有效的 partition(如果 partition 没有数据进
来,后面就不会给预分配内存)

split function#

split 阶段会遍历 split 函数,负责将 ColumnarBatch 转成的 rowVector 根据 partition 切出来放进每个
partition 预分配好的内存,当内存达到溢出要求后,会将内存中的数据 spill 出来。

rowVector 的格式样例如图
image

  • split function 主要包含四种函数,分别处理不同类型的 column
  • splitFixedWidthValueBuffer 将固定位宽的列切分,一般是 column type(例如 int,boolean) splitValidityBuffer 将有效的字节值切分,处理 null 值
  • splitBinaryArray 将二进制队列的数据切分
  • splitComplexType 将复杂类型(struct, map, list)切分

切分前会首先初始化内存 Buffer:preAllocPartitionBuffer,确保切分后的数据能完整装进内存
因为切分的数据会被多次遍历,因此在实际场景应当控制每次切分的数据大小尽可能装入 CPUL1/L2 缓存,这样可以达到一个比较好的性能

preAllocBuffer#

gluten 基于 arrow 来实现内存管理和数据交换。

基于前面切分后的数据,每个 partition 会预分配一块可重用,可 resize 的内存用于缓存切分数据

预分配的大小计算公式:

#预分配大小
memLimit = max(sparkCurrentOffHeapMemory / taskNum, 128 * 1024 * 1024)
#缓存中最大可以存放的rows num(newSize)
maxRows = memLimit / bytesPerRow / numPartitions_ / 4

buffe 是一块动态伸缩的内存,利用 arrow 做了精细化的控制:通常触发伸缩的条件(实际内存伸缩的情况会更多)是、

#默认伸缩因子THRESHOLD是0.25
newSize < (1 - THRESHOLD) || newSize > (1 + THRESHOLD)

其完整的内存伸缩情况如下图:
image
其中evict and free data就会把内存的数据 spill 出去,每次 spill 的数据会通过 arrow 拷贝输出,初始大小是 4096,如果不够会自行扩大

spill and merge#

ColumnarShuffleWriter 的设计虽然是 hashBased shuffle, 但是 gluten 在基础上融入了一些 unsafe sort 的设计思路。
在本地 spill 的场景中,每次当使用内存达到阈值或者 OOM 时,会将内存中的数据压缩并多次溢写到
磁盘文件中;当经过多轮 columnarBatch 处理,最后会把所有溢写文件合并成一个文件

  1. 每个 spill 文件中数据按照 partition id 的顺序写入
  2. 合并时会按照 partition id 遍历所有文件,实现每个 partition 数据的合并

ColumnarShuffleReader#

reader 端的设计其实比较简单,基本可以复用原生的 ShuffleReader,因为从 map 端拉取的数据基本没变,主要需要将拉取的数据转换成 ColumnarBatch 供下游继续使用。这里只需要重写反序列化器即可实现。

Remote Shuffle#

remote columnar shuffle 的设计思路和本地 shuffle 基本相同,主要问题在于如何将数据正确推送到 RSS。
在 gluten 适配 celeborn 的设计中,重新实现了 writer 和 reader。其基本思路和本地 shuffle 类似,通过 native engine 来实现 columnarBatch 的切分和 push,在 reader 端通过实现反序列化器来获取 columnBatch。

image

  1. 获取每个 record 映射成 columnBatch
  2. 根据分区器,计算分区和 row 的对应关系
  3. 构建映射表,完成 partition id -> rowid 的映射关系
  4. 预分配内存用于多次装载数据
  5. 调用若干 split function, 切分数据装进缓存
  6. 超过缓存上限溢写到 celeborn

前面基本几点在 Local Shuffle 环节均已介绍,推送数据则是通过 JNI 将 celeborn client 传递到 native engine,负责将溢写的数据 push 到 celeborn,然后在 celeborn 上完成数据的 merge。

加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。