banner
bladedragon

bladedragon

blaze 执行流程源码阅读

首先介绍一下 blaze,blaze 是快手自研的基于 Rust 语言和 DataFusion 框架开发的 Spark 向量化执行引擎,旨在通过本机矢量化执行技术来加速 Spark SQL 的查询处理。为什么考虑深入了解 blaze,首先是因为当前相对成熟的开源 spark 向量化引擎项目较少(指开箱即用和存在生产运行案例),比较热门就只有 gluten 和 blaze;此外 blaze 基于 datafusion,采用 rust 实现核心向量化组件,比起 gluten 基于 velox (通过 C++ 实现),上手成本相对较低,最近刚好在学 rust,所以自然而然地对 blaze 的兴趣更大了。

本文主要是介绍 blaze 的整体架构和启动流程,其中主要会结合源码进行分析。由于本人也还在学习中,如果内容存在纰漏,也欢迎指出并一起讨论。

整体架构#

blaze 本质是一个 spark 扩展,实现加速的核心原理是通过替换算子为 native 算子从而利用 datafusion 进行加速,同时通过列式计算和高效的内存通信,使得在不侵入业务作业逻辑的前提下实现性能的较大提升。

以下是 spark 原生和 blaze 的架构实现对比:

原生 spark

image

spark+ blaze

image

参考 blaze 的项目代码,可见 blaze 的主要实现包括

  • spark extension:一系列替换执行计划的规则和适配多版本的 spark shim 包
  • jni bridge: 一套用于 java 和 rust 的通信机制,基于 pb 和 arrow 实现高效数据传输
  • native plan: 一系列基于 datafusion 适配的 native 算子

接下来我们主要了解 spark extension 以及 jni bridge 部分代码,算子方面留给以后再说吧~同时注意本文中涉及的代码均来自 branch v4.0.1

启动流程#

blaze 通过通用的 spark 插件形式注入,启动的入口自然在继承自 SparkSessionExtensions 的 BlazeSparkSessionExtension 中,通过扩展注册列式处理规则,执行算子替换策略。其中核心执行代码如下:

//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions

override def preColumnarTransitions: Rule[SparkPlan] = {
  new Rule[SparkPlan] {
    override def apply(sparkPlan: SparkPlan): SparkPlan = {
      // 检查Blaze是否启用
      if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
      
      // 生成转换策略
      BlazeConvertStrategy.apply(sparkPlan)
      
      // 递归转换执行计划
      val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
      
      // 执行特定算子的后处理
      Shims.get.postTransform(transformed, sparkSession.sparkContext)
    }
  }
}

在 BlazeConvertStrategy 中我们可以看到主要的作用是给节点打标签从而判断是否需要转换 native,

当前标签类型包含:

// 标记节点是否可转换为Blaze原生执行
// 示例:在转换测试后更新标记,决定是否保留转换结果
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")

// 标记是否需要回退原格式
// 示例:当遇到无法完全转换的操作时设置此标记
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")

// 核心转换策略标记,包含三种状态:
// - Default: 未决状态
// - AlwaysConvert: 强制转换(如文件扫描)
// - NeverConvert: 禁止转换(如低效聚合场景)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")

// 标记子节点是否需要保持排序特性
// 示例:SortExec节点会清除此标记以避免冗余排序
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")

// 用于Join操作标记较小数据侧
// 示例:BroadcastHashJoinExec中决定广播哪一侧数据
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")

通过 BlazeConverters.convertSparkPlanRecursively 会把节点尽可能转成 native plan 并确定需要回退的节点,之后会借由 spark 的分布式能力执行 native plan。

下面分别展示一下 nativePlan 节点的构造和对应 nativeRDD 的构造,大家可以和 spark 原生的构建做一个参照:
NativePlan 对应 SparkPlan

trait NativeSupports extends SparkPlan {
  // 实际执行native的方法,最后会返回nativeRDD
  protected def doExecuteNative(): NativeRDD
  
  protected override def doExecute(): RDD[InternalRow] = doExecuteNative()

  def executeNative(): NativeRDD = executeQuery {
    doExecuteNative()
  }

  // hack了一个是否全量shuffleRead的函数,目前看默认为true,是一个未完成的feature
  def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}

NativeRDD 对应 RDD

class NativeRDD(
    // 保持SparkContext的transient特性防止序列化
    @transient private val rddSparkContext: SparkContext,
    val metrics: MetricNode,
    private val rddPartitions: Array[Partition],
    private val rddDependencies: Seq[Dependency[_]],
    private val rddShuffleReadFull: Boolean,
    //对应的native plan,这里的pysicalPlanNode实际是用pb定义的类,是datafusion执行计划算子,是jni数据交互的核心
    val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
    val friendlyName: String = null)
    extends RDD[InternalRow](rddSparkContext, rddDependencies)
    with Logging {

  if (friendlyName != null) {
    setName(friendlyName)
  }

  def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
  Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)

  override protected def getPartitions: Array[Partition] = rddPartitions
  override protected def getDependencies: Seq[Dependency[_]] = rddDependencies

  //执行入口,通过调用JNI执行native函数
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val computingNativePlan = nativePlan(split, context)
    NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
  }
}

既然到了 executeNativePlan 这一步,blaze 具体是如何实现 jvm 和 rust runtime 通信的呢?

jni 通信机制#

java 侧实现#

我们首先看一下 NativeHelper.executeNativePlan 函数:

  def executeNativePlan(
      nativePlan: PhysicalPlanNode,
      metrics: MetricNode,
      partition: Partition,
      context: Option[TaskContext]): Iterator[InternalRow] = {

    if (partition.index == 0 && metrics != null && context.nonEmpty) {
      metrics.foreach(_.add("stage_id", context.get.stageId()))
    }
    if (nativePlan == null) {
      return Iterator.empty
    }
    BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
  }

BlazeCallNativeWrapper 包装了 jni 交互的细节,其中重要的几个步骤:

  1. 第一次初始化时启动 native runtime,首先确保加载 libblaze.so 从而获取 native 引擎运行库,这部分其实就是 blaze 核心的 native-engine 部分;然后通过调用 native 函数 JniBridge.callNative () 来启动 runtime 并持有对应的指针,用于后续数据访问
class BlazeCallNativeWrapper(...) {
  // 确保本地库加载  
  BlazeCallNativeWrapper.initNative() 
  // 初始化Native运行时  
  private var nativeRuntimePtr = JniBridge.callNative(...) 
  // ...
}
  1. BlazeCallNativeWrapper 会持有一个数据迭代器,当调用 hasNext 的时候才会去 native runtime 拉取就绪的数据。触发函数是 JniBridge.nextBatch ()。其中 native runtime 会通过调用 scala 函数 importBatch () 将 arrow 格式的数据转换成 scala ArrayBuffer 类,保存在 batchRows 中。

可以学习一下用 arrow 进行数据交换的实现:

//Arrow 数据元信息导入(native runtime回调)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
  Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
    Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
      arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
      schema = ArrowUtils.fromArrowSchema(arrowSchema)
      toUnsafe = UnsafeProjection.create(schema)
    }
  }
}
// Arrow数据导入(native runtime回调)
 protected def importBatch(ffiArrayPtr: Long): Unit = {
    if (nativeRuntimePtr == 0) {
      throw new RuntimeException("Native runtime is finalized")
    }

    Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
      Using.resources(
        ArrowArray.wrap(ffiArrayPtr),
        VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
        Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
        val batch = ColumnarHelper.rootAsBatch(root)

        batchRows.append(
          ColumnarHelper
            //列行转换
            .batchAsRowIter(batch)
            // toUnsafe复用避免重复计算
            .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
            .toSeq: _*)
      }
    }
  }

rust 侧实现#

接下来我们就要查看 native runtime 的细节,主要关注 callNative 和 nextBatch 的实现。

callNative 方法在 rust 中的调用在 execu.rs 的 Java_org_apache_spark_sql_blaze_JniBridge_callNative 里:
核心做了这些工作

  1. 初始化 jni java classes
  2. 初始化 datafusion 的 session 上下文
  3. 初始化执行运行时 NativeExecutionRuntime 用于监听运行过程和处理执行计划

可以看一下完整实现,官方有注释非常好理解:

#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
    env: JNIEnv,
    _: JClass,
    executor_memory_overhead: i64,
    native_wrapper: JObject,
) -> i64 (
    handle_unwinded_scope(|| -> Result<i64> {
        static SESSION: OnceCell<SessionContext> = OnceCell::new();
        static INIT: OnceCell<()> = OnceCell::new();
       // 单例初始化一次 
        INIT.get_or_try_init(|| {
            // logging is not initialized at this moment
            eprintln!("------ initializing blaze native environment ------");
            init_logging();

            // init jni java classes
            log::info!("initializing JNI bridge");
            JavaClasses::init(&env);

            // init datafusion session context
            log::info!("initializing datafusion session");
            SESSION.get_or_try_init(|| {
                let max_memory = executor_memory_overhead as usize;
                let memory_fraction = conf::MEMORY_FRACTION.value()?;
                let batch_size = conf::BATCH_SIZE.value()? as usize;
                MemManager::init((max_memory as f64 * memory_fraction) as usize);

                let session_config = SessionConfig::new().with_batch_size(batch_size);
                let runtime_config =
                    RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
                let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
                let session = SessionContext::new_with_config_rt(session_config, runtime);
                Ok::<_, DataFusionError>(session)
            })?;
            Ok::<_, DataFusionError>(())
        })?;
        let native_wrapper = jni_new_global_ref!(native_wrapper)?;

        // create execution runtime
        let runtime = Box::new(NativeExecutionRuntime::start(
            native_wrapper,
            SESSION.get().unwrap().task_ctx(),
        )?);

        // returns runtime raw pointer
        Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
    })
}

在 rt.rs 中可以看到初始化 NativeExecutionRuntime 时将 protobuf 定义的执行计划全部反序列化转换成 datafusion 的 ExecutionPlan 结构,然后使用 datafusion 执行整个执行计划,datafusion 会使用 tokio 发起异步执行操作,依次迭代触发各个算子的 compute。
代码很长,我简短了一些:

pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
    // 1. 从JNI获取并解码任务定义
    let raw_task_definition = jni_call!(...);
    let task_definition = TaskDefinition::decode(...);

    // 2. 创建DataFusion执行计划
    let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;

    // 3. 初始化Tokio多线程运行时
    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
        .on_thread_start(|| { /* 设置线程本地上下文 */ })
        .build()?;

    // 4. 创建数据生产通道
    let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);

    // 5. 异步执行数据流
    let consume_stream = async move {
        let mut stream = exec_ctx.execute(&execution_plan)?;
        jni_call!(...importSchema(...)); // 导出schema到Java
        
        // 6. 流式生产数据批次
        while let Some(batch) = stream.next().await {
            batch_sender.send(Ok(Some(batch)))?;
        }
    };

    // 7. 启动异步任务
    let join_handle = tokio_runtime.spawn(async move {
        consume_stream.await.unwrap_or_else(|err| { 
            /* 错误处理 */
        });
    });

    // 8. 返回运行时实例
    Ok(Self { ... })
}

接下来看看 nextBatch 的实现,naxtBatch 实际调用的 NativeExecutionRuntime 的 next_bacht 函数,主要利用 arrow 实现了零拷贝的数据传输:

pub fn next_batch(&self) -> bool {
    // 1. 定义内部闭包处理实际数据接收
        let next_batch = || -> Result<bool> {
            match self
                .batch_receiver
                .recv()
                .or_else(|err| df_execution_err!("receive batch error: {err}"))??
            {
                Some(batch) => {
                    // 2. 转换Arrow数据结构
                    let struct_array = StructArray::from(batch);
                    let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
                    // 3. 通过JNI调用Java层方法导入数据
                    jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
                        .importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
                    )?;
                    Ok(true)
                }
                None => Ok(false),
            }
        };
         // 4. 执行并处理最终结果
        match next_batch() {
            Ok(ret) => return ret,
            Err(err) => {
                // 5. 错误回调Java层
                let _ = set_error(
                    &self.native_wrapper,
                    &format!("poll record batch error: {err}"),
                    None,
                );
                return false;
            }
        }
    }

结尾#

以上就是大致的 blaze 启动流程,接下来就是深入了解各个算子的实现方式了,有空的话后续我也会在更新几篇源码阅读,这里先占个坑(
这里也提供一个 ai 的一图流流程总结:

┌───────────────────────┐
Spark Driver
│  (Java/Scala 端)       │
└──────────┬────────────┘
1. 生成Protobuf格式执行计划

┌───────────────────────┐
JNI 接口层             │
- 通过FFI跨语言调用     │
- 传递序列化的二进制数据 │
└──────────┬────────────┘
2. 反序列化为PhysicalPlanNode

┌───────────────────────┐
TryInto::try_into()   │
- 核心转换入口         │  
└──────────┬────────────┘ 
3. 递归处理节点

┌───────────────────────┐
│ ExecutionPlan构建      │
- 通过match处理30+节点类型 │
- 例如:               │
ProjectionExec
FilterExec
HashJoinExec
└──────────┬────────────┘
4. 构建物理运算符树

┌───────────────────────┐
│ 执行引擎               │
- 调用execute()方法    │
- 分布式任务调度        │
└───────────────────────┘
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。