首先介绍一下 blaze,blaze 是快手自研的基于 Rust 语言和 DataFusion 框架开发的 Spark 向量化执行引擎,旨在通过本机矢量化执行技术来加速 Spark SQL 的查询处理。为什么考虑深入了解 blaze,首先是因为当前相对成熟的开源 spark 向量化引擎项目较少(指开箱即用和存在生产运行案例),比较热门就只有 gluten 和 blaze;此外 blaze 基于 datafusion,采用 rust 实现核心向量化组件,比起 gluten 基于 velox (通过 C++ 实现),上手成本相对较低,最近刚好在学 rust,所以自然而然地对 blaze 的兴趣更大了。
本文主要是介绍 blaze 的整体架构和启动流程,其中主要会结合源码进行分析。由于本人也还在学习中,如果内容存在纰漏,也欢迎指出并一起讨论。
整体架构#
blaze 本质是一个 spark 扩展,实现加速的核心原理是通过替换算子为 native 算子从而利用 datafusion 进行加速,同时通过列式计算和高效的内存通信,使得在不侵入业务作业逻辑的前提下实现性能的较大提升。
以下是 spark 原生和 blaze 的架构实现对比:
原生 spark
spark+ blaze
参考 blaze 的项目代码,可见 blaze 的主要实现包括
- spark extension:一系列替换执行计划的规则和适配多版本的 spark shim 包
- jni bridge: 一套用于 java 和 rust 的通信机制,基于 pb 和 arrow 实现高效数据传输
- native plan: 一系列基于 datafusion 适配的 native 算子
接下来我们主要了解 spark extension 以及 jni bridge 部分代码,算子方面留给以后再说吧~同时注意本文中涉及的代码均来自 branch v4.0.1
启动流程#
blaze 通过通用的 spark 插件形式注入,启动的入口自然在继承自 SparkSessionExtensions 的 BlazeSparkSessionExtension 中,通过扩展注册列式处理规则,执行算子替换策略。其中核心执行代码如下:
//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions
override def preColumnarTransitions: Rule[SparkPlan] = {
new Rule[SparkPlan] {
override def apply(sparkPlan: SparkPlan): SparkPlan = {
// 检查Blaze是否启用
if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
// 生成转换策略
BlazeConvertStrategy.apply(sparkPlan)
// 递归转换执行计划
val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
// 执行特定算子的后处理
Shims.get.postTransform(transformed, sparkSession.sparkContext)
}
}
}
在 BlazeConvertStrategy 中我们可以看到主要的作用是给节点打标签从而判断是否需要转换 native,
当前标签类型包含:
// 标记节点是否可转换为Blaze原生执行
// 示例:在转换测试后更新标记,决定是否保留转换结果
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")
// 标记是否需要回退原格式
// 示例:当遇到无法完全转换的操作时设置此标记
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")
// 核心转换策略标记,包含三种状态:
// - Default: 未决状态
// - AlwaysConvert: 强制转换(如文件扫描)
// - NeverConvert: 禁止转换(如低效聚合场景)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")
// 标记子节点是否需要保持排序特性
// 示例:SortExec节点会清除此标记以避免冗余排序
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")
// 用于Join操作标记较小数据侧
// 示例:BroadcastHashJoinExec中决定广播哪一侧数据
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")
通过 BlazeConverters.convertSparkPlanRecursively 会把节点尽可能转成 native plan 并确定需要回退的节点,之后会借由 spark 的分布式能力执行 native plan。
下面分别展示一下 nativePlan 节点的构造和对应 nativeRDD 的构造,大家可以和 spark 原生的构建做一个参照:
NativePlan 对应 SparkPlan
trait NativeSupports extends SparkPlan {
// 实际执行native的方法,最后会返回nativeRDD
protected def doExecuteNative(): NativeRDD
protected override def doExecute(): RDD[InternalRow] = doExecuteNative()
def executeNative(): NativeRDD = executeQuery {
doExecuteNative()
}
// hack了一个是否全量shuffleRead的函数,目前看默认为true,是一个未完成的feature
def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}
NativeRDD 对应 RDD
class NativeRDD(
// 保持SparkContext的transient特性防止序列化
@transient private val rddSparkContext: SparkContext,
val metrics: MetricNode,
private val rddPartitions: Array[Partition],
private val rddDependencies: Seq[Dependency[_]],
private val rddShuffleReadFull: Boolean,
//对应的native plan,这里的pysicalPlanNode实际是用pb定义的类,是datafusion执行计划算子,是jni数据交互的核心
val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
val friendlyName: String = null)
extends RDD[InternalRow](rddSparkContext, rddDependencies)
with Logging {
if (friendlyName != null) {
setName(friendlyName)
}
def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)
override protected def getPartitions: Array[Partition] = rddPartitions
override protected def getDependencies: Seq[Dependency[_]] = rddDependencies
//执行入口,通过调用JNI执行native函数
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val computingNativePlan = nativePlan(split, context)
NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
}
}
既然到了 executeNativePlan 这一步,blaze 具体是如何实现 jvm 和 rust runtime 通信的呢?
jni 通信机制#
java 侧实现#
我们首先看一下 NativeHelper.executeNativePlan 函数:
def executeNativePlan(
nativePlan: PhysicalPlanNode,
metrics: MetricNode,
partition: Partition,
context: Option[TaskContext]): Iterator[InternalRow] = {
if (partition.index == 0 && metrics != null && context.nonEmpty) {
metrics.foreach(_.add("stage_id", context.get.stageId()))
}
if (nativePlan == null) {
return Iterator.empty
}
BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
}
BlazeCallNativeWrapper 包装了 jni 交互的细节,其中重要的几个步骤:
- 第一次初始化时启动 native runtime,首先确保加载 libblaze.so 从而获取 native 引擎运行库,这部分其实就是 blaze 核心的 native-engine 部分;然后通过调用 native 函数 JniBridge.callNative () 来启动 runtime 并持有对应的指针,用于后续数据访问
class BlazeCallNativeWrapper(...) {
// 确保本地库加载
BlazeCallNativeWrapper.initNative()
// 初始化Native运行时
private var nativeRuntimePtr = JniBridge.callNative(...)
// ...
}
- BlazeCallNativeWrapper 会持有一个数据迭代器,当调用 hasNext 的时候才会去 native runtime 拉取就绪的数据。触发函数是 JniBridge.nextBatch ()。其中 native runtime 会通过调用 scala 函数 importBatch () 将 arrow 格式的数据转换成 scala ArrayBuffer 类,保存在 batchRows 中。
可以学习一下用 arrow 进行数据交换的实现:
//Arrow 数据元信息导入(native runtime回调)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
schema = ArrowUtils.fromArrowSchema(arrowSchema)
toUnsafe = UnsafeProjection.create(schema)
}
}
}
// Arrow数据导入(native runtime回调)
protected def importBatch(ffiArrayPtr: Long): Unit = {
if (nativeRuntimePtr == 0) {
throw new RuntimeException("Native runtime is finalized")
}
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
Using.resources(
ArrowArray.wrap(ffiArrayPtr),
VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
val batch = ColumnarHelper.rootAsBatch(root)
batchRows.append(
ColumnarHelper
//列行转换
.batchAsRowIter(batch)
// toUnsafe复用避免重复计算
.map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
.toSeq: _*)
}
}
}
rust 侧实现#
接下来我们就要查看 native runtime 的细节,主要关注 callNative 和 nextBatch 的实现。
callNative 方法在 rust 中的调用在 execu.rs 的 Java_org_apache_spark_sql_blaze_JniBridge_callNative 里:
核心做了这些工作
- 初始化 jni java classes
- 初始化 datafusion 的 session 上下文
- 初始化执行运行时 NativeExecutionRuntime 用于监听运行过程和处理执行计划
可以看一下完整实现,官方有注释非常好理解:
#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
env: JNIEnv,
_: JClass,
executor_memory_overhead: i64,
native_wrapper: JObject,
) -> i64 (
handle_unwinded_scope(|| -> Result<i64> {
static SESSION: OnceCell<SessionContext> = OnceCell::new();
static INIT: OnceCell<()> = OnceCell::new();
// 单例初始化一次
INIT.get_or_try_init(|| {
// logging is not initialized at this moment
eprintln!("------ initializing blaze native environment ------");
init_logging();
// init jni java classes
log::info!("initializing JNI bridge");
JavaClasses::init(&env);
// init datafusion session context
log::info!("initializing datafusion session");
SESSION.get_or_try_init(|| {
let max_memory = executor_memory_overhead as usize;
let memory_fraction = conf::MEMORY_FRACTION.value()?;
let batch_size = conf::BATCH_SIZE.value()? as usize;
MemManager::init((max_memory as f64 * memory_fraction) as usize);
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime_config =
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session = SessionContext::new_with_config_rt(session_config, runtime);
Ok::<_, DataFusionError>(session)
})?;
Ok::<_, DataFusionError>(())
})?;
let native_wrapper = jni_new_global_ref!(native_wrapper)?;
// create execution runtime
let runtime = Box::new(NativeExecutionRuntime::start(
native_wrapper,
SESSION.get().unwrap().task_ctx(),
)?);
// returns runtime raw pointer
Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
})
}
在 rt.rs 中可以看到初始化 NativeExecutionRuntime 时将 protobuf 定义的执行计划全部反序列化转换成 datafusion 的 ExecutionPlan 结构,然后使用 datafusion 执行整个执行计划,datafusion 会使用 tokio 发起异步执行操作,依次迭代触发各个算子的 compute。
代码很长,我简短了一些:
pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
// 1. 从JNI获取并解码任务定义
let raw_task_definition = jni_call!(...);
let task_definition = TaskDefinition::decode(...);
// 2. 创建DataFusion执行计划
let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;
// 3. 初始化Tokio多线程运行时
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| { /* 设置线程本地上下文 */ })
.build()?;
// 4. 创建数据生产通道
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
// 5. 异步执行数据流
let consume_stream = async move {
let mut stream = exec_ctx.execute(&execution_plan)?;
jni_call!(...importSchema(...)); // 导出schema到Java
// 6. 流式生产数据批次
while let Some(batch) = stream.next().await {
batch_sender.send(Ok(Some(batch)))?;
}
};
// 7. 启动异步任务
let join_handle = tokio_runtime.spawn(async move {
consume_stream.await.unwrap_or_else(|err| {
/* 错误处理 */
});
});
// 8. 返回运行时实例
Ok(Self { ... })
}
接下来看看 nextBatch 的实现,naxtBatch 实际调用的 NativeExecutionRuntime 的 next_bacht 函数,主要利用 arrow 实现了零拷贝的数据传输:
pub fn next_batch(&self) -> bool {
// 1. 定义内部闭包处理实际数据接收
let next_batch = || -> Result<bool> {
match self
.batch_receiver
.recv()
.or_else(|err| df_execution_err!("receive batch error: {err}"))??
{
Some(batch) => {
// 2. 转换Arrow数据结构
let struct_array = StructArray::from(batch);
let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
// 3. 通过JNI调用Java层方法导入数据
jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
.importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
)?;
Ok(true)
}
None => Ok(false),
}
};
// 4. 执行并处理最终结果
match next_batch() {
Ok(ret) => return ret,
Err(err) => {
// 5. 错误回调Java层
let _ = set_error(
&self.native_wrapper,
&format!("poll record batch error: {err}"),
None,
);
return false;
}
}
}
结尾#
以上就是大致的 blaze 启动流程,接下来就是深入了解各个算子的实现方式了,有空的话后续我也会在更新几篇源码阅读,这里先占个坑(
这里也提供一个 ai 的一图流流程总结:
┌───────────────────────┐
│ Spark Driver │
│ (Java/Scala 端) │
└──────────┬────────────┘
│ 1. 生成Protobuf格式执行计划
▼
┌───────────────────────┐
│ JNI 接口层 │
│ - 通过FFI跨语言调用 │
│ - 传递序列化的二进制数据 │
└──────────┬────────────┘
│ 2. 反序列化为PhysicalPlanNode
▼
┌───────────────────────┐
│ TryInto::try_into() │
│ - 核心转换入口 │
└──────────┬────────────┘
│ 3. 递归处理节点
▼
┌───────────────────────┐
│ ExecutionPlan构建 │
│ - 通过match处理30+节点类型 │
│ - 例如: │
│ ProjectionExec │
│ FilterExec │
│ HashJoinExec │
└──────────┬────────────┘
│ 4. 构建物理运算符树
▼
┌───────────────────────┐
│ 执行引擎 │
│ - 调用execute()方法 │
│ - 分布式任务调度 │
└───────────────────────┘