banner
bladedragon

bladedragon

blaze 執行流程源碼閱讀

首先介紹一下 blaze,blaze 是快手自研的基於 Rust 語言和 DataFusion 框架開發的 Spark 向量化執行引擎,旨在通過本機矢量化執行技術來加速 Spark SQL 的查詢處理。為什麼考慮深入了解 blaze,首先是因為當前相對成熟的開源 spark 向量化引擎項目較少(指開箱即用和存在生產運行案例),比較熱門就只有 gluten 和 blaze;此外 blaze 基於 datafusion,採用 rust 實現核心向量化組件,比起 gluten 基於 velox (通過 C++ 實現),上手成本相對較低,最近剛好在學 rust,所以自然而然地對 blaze 的興趣更大了。

本文主要是介紹 blaze 的整體架構和啟動流程,其中主要會結合源碼進行分析。由於本人也還在學習中,如果內容存在纰漏,也歡迎指出並一起討論。

整體架構#

blaze 本質是一個 spark 擴展,實現加速的核心原理是通過替換算子為 native 算子從而利用 datafusion 進行加速,同時通過列式計算和高效的內存通信,使得在不侵入業務作業邏輯的前提下實現性能的較大提升。

以下是 spark 原生和 blaze 的架構實現對比:

原生 spark

image

spark+ blaze

image

參考 blaze 的項目代碼,可見 blaze 的主要實現包括

  • spark extension:一系列替換執行計劃的規則和適配多版本的 spark shim 包
  • jni bridge: 一套用於 java 和 rust 的通信機制,基於 pb 和 arrow 實現高效數據傳輸
  • native plan: 一系列基於 datafusion 適配的 native 算子

接下來我們主要了解 spark extension 以及 jni bridge 部分代碼,算子方面留給以後再說吧~同時注意本文中涉及的代碼均來自 branch v4.0.1

啟動流程#

blaze 通過通用的 spark 插件形式注入,啟動的入口自然在繼承自 SparkSessionExtensions 的 BlazeSparkSessionExtension 中,通過擴展註冊列式處理規則,執行算子替換策略。其中核心執行代碼如下:

//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions

override def preColumnarTransitions: Rule[SparkPlan] = {
  new Rule[SparkPlan] {
    override def apply(sparkPlan: SparkPlan): SparkPlan = {
      // 檢查Blaze是否啟用
      if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
      
      // 生成轉換策略
      BlazeConvertStrategy.apply(sparkPlan)
      
      // 遞歸轉換執行計劃
      val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
      
      // 執行特定算子的後處理
      Shims.get.postTransform(transformed, sparkSession.sparkContext)
    }
  }
}

在 BlazeConvertStrategy 中我們可以看到主要的作用是給節點打標籤從而判斷是否需要轉換 native,

目前標籤類型包含:

// 標記節點是否可轉換為Blaze原生執行
// 示例:在轉換測試後更新標記,決定是否保留轉換結果
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")

// 標記是否需要回退原格式
// 示例:當遇到無法完全轉換的操作時設置此標記
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")

// 核心轉換策略標記,包含三種狀態:
// - Default: 未決狀態
// - AlwaysConvert: 強制轉換(如文件掃描)
// - NeverConvert: 禁止轉換(如低效聚合場景)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")

// 標記子節點是否需要保持排序特性
// 示例:SortExec節點會清除此標記以避免冗餘排序
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")

// 用於Join操作標記較小數據側
// 示例:BroadcastHashJoinExec中決定廣播哪一側數據
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")

通過 BlazeConverters.convertSparkPlanRecursively 會把節點盡可能轉成 native plan 並確定需要回退的節點,之後會借由 spark 的分佈式能力執行 native plan。

下面分別展示一下 nativePlan 節點的構造和對應 nativeRDD 的構造,大家可以和 spark 原生的構建做一個參照:
NativePlan 對應 SparkPlan

trait NativeSupports extends SparkPlan {
  // 實際執行native的方法,最後會返回nativeRDD
  protected def doExecuteNative(): NativeRDD
  
  protected override def doExecute(): RDD[InternalRow] = doExecuteNative()

  def executeNative(): NativeRDD = executeQuery {
    doExecuteNative()
  }

  // hack了一個是否全量shuffleRead的函數,目前看默認為true,是一個未完成的feature
  def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}

NativeRDD 對應 RDD

class NativeRDD(
    // 保持SparkContext的transient特性防止序列化
    @transient private val rddSparkContext: SparkContext,
    val metrics: MetricNode,
    private val rddPartitions: Array[Partition],
    private val rddDependencies: Seq[Dependency[_]],
    private val rddShuffleReadFull: Boolean,
    //對應的native plan,這裡的pysicalPlanNode實際是用pb定義的類,是datafusion執行計劃算子,是jni數據交互的核心
    val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
    val friendlyName: String = null)
    extends RDD[InternalRow](rddSparkContext, rddDependencies)
    with Logging {

  if (friendlyName != null) {
    setName(friendlyName)
  }

  def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
  Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)

  override protected def getPartitions: Array[Partition] = rddPartitions
  override protected def getDependencies: Seq[Dependency[_]] = rddDependencies

  //執行入口,通過調用JNI執行native函數
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val computingNativePlan = nativePlan(split, context)
    NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
  }
}

既然到了 executeNativePlan 這一步,blaze 具體是如何實現 jvm 和 rust runtime 通信的呢?

jni 通信機制#

java 側實現#

我們首先看一下 NativeHelper.executeNativePlan 函數:

  def executeNativePlan(
      nativePlan: PhysicalPlanNode,
      metrics: MetricNode,
      partition: Partition,
      context: Option[TaskContext]): Iterator[InternalRow] = {

    if (partition.index == 0 && metrics != null && context.nonEmpty) {
      metrics.foreach(_.add("stage_id", context.get.stageId()))
    }
    if (nativePlan == null) {
      return Iterator.empty
    }
    BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
  }

BlazeCallNativeWrapper 包裝了 jni 交互的細節,其中重要的幾個步驟:

  1. 第一次初始化時啟動 native runtime,首先確保加載 libblaze.so 從而獲取 native 引擎運行庫,這部分其實就是 blaze 核心的 native-engine 部分;然後通過調用 native 函數 JniBridge.callNative () 來啟動 runtime 並持有對應的指針,用於後續數據訪問
class BlazeCallNativeWrapper(...) {
  // 確保本地庫加載  
  BlazeCallNativeWrapper.initNative() 
  // 初始化Native運行時  
  private var nativeRuntimePtr = JniBridge.callNative(...) 
  // ...
}
  1. BlazeCallNativeWrapper 會持有一個數據迭代器,當調用 hasNext 的時候才會去 native runtime 拉取就緒的數據。觸發函數是 JniBridge.nextBatch ()。其中 native runtime 會通過調用 scala 函數 importBatch () 將 arrow 格式的數據轉換成 scala ArrayBuffer 類,保存在 batchRows 中。

可以學習一下用 arrow 進行數據交換的實現:

//Arrow 數據元信息導入(native runtime回調)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
  Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
    Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
      arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
      schema = ArrowUtils.fromArrowSchema(arrowSchema)
      toUnsafe = UnsafeProjection.create(schema)
    }
  }
}
// Arrow數據導入(native runtime回調)
 protected def importBatch(ffiArrayPtr: Long): Unit = {
    if (nativeRuntimePtr == 0) {
      throw new RuntimeException("Native runtime is finalized")
    }

    Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
      Using.resources(
        ArrowArray.wrap(ffiArrayPtr),
        VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
        Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
        val batch = ColumnarHelper.rootAsBatch(root)

        batchRows.append(
          ColumnarHelper
            //列行轉換
            .batchAsRowIter(batch)
            // toUnsafe復用避免重複計算
            .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
            .toSeq: _*)
      }
    }
  }

rust 側實現#

接下來我們就要查看 native runtime 的細節,主要關注 callNative 和 nextBatch 的實現。

callNative 方法在 rust 中的調用在 execu.rs 的 Java_org_apache_spark_sql_blaze_JniBridge_callNative 里:
核心做了這些工作

  1. 初始化 jni java classes
  2. 初始化 datafusion 的 session 上下文
  3. 初始化執行運行時 NativeExecutionRuntime 用於監聽運行過程和處理執行計劃

可以看一下完整實現,官方有註釋非常好理解:

#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
    env: JNIEnv,
    _: JClass,
    executor_memory_overhead: i64,
    native_wrapper: JObject,
) -> i64 (
    handle_unwinded_scope(|| -> Result<i64> {
        static SESSION: OnceCell<SessionContext> = OnceCell::new();
        static INIT: OnceCell<()> = OnceCell::new();
       // 單例初始化一次 
        INIT.get_or_try_init(|| {
            // logging is not initialized at this moment
            eprintln!("------ initializing blaze native environment ------");
            init_logging();

            // init jni java classes
            log::info!("initializing JNI bridge");
            JavaClasses::init(&env);

            // init datafusion session context
            log::info!("initializing datafusion session");
            SESSION.get_or_try_init(|| {
                let max_memory = executor_memory_overhead as usize;
                let memory_fraction = conf::MEMORY_FRACTION.value()?;
                let batch_size = conf::BATCH_SIZE.value()? as usize;
                MemManager::init((max_memory as f64 * memory_fraction) as usize);

                let session_config = SessionConfig::new().with_batch_size(batch_size);
                let runtime_config =
                    RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
                let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
                let session = SessionContext::new_with_config_rt(session_config, runtime);
                Ok::<_, DataFusionError>(session)
            })?;
            Ok::<_, DataFusionError>(())
        })?;
        let native_wrapper = jni_new_global_ref!(native_wrapper)?;

        // create execution runtime
        let runtime = Box::new(NativeExecutionRuntime::start(
            native_wrapper,
            SESSION.get().unwrap().task_ctx(),
        )?);

        // returns runtime raw pointer
        Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
    })
}

在 rt.rs 中可以看到初始化 NativeExecutionRuntime 時將 protobuf 定義的執行計劃全部反序列化轉換成 datafusion 的 ExecutionPlan 結構,然後使用 datafusion 執行整個執行計劃,datafusion 會使用 tokio 發起異步執行操作,依次迭代觸發各個算子的 compute。
代碼很長,我簡短了一些:

pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
    // 1. 從JNI獲取並解碼任務定義
    let raw_task_definition = jni_call!(...);
    let task_definition = TaskDefinition::decode(...);

    // 2. 創建DataFusion執行計劃
    let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;

    // 3. 初始化Tokio多線程運行時
    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
        .on_thread_start(|| { /* 設置線程本地上下文 */ })
        .build()?;

    // 4. 創建數據生產通道
    let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);

    // 5. 異步執行數據流
    let consume_stream = async move {
        let mut stream = exec_ctx.execute(&execution_plan)?;
        jni_call!(...importSchema(...)); // 導出schema到Java
        
        // 6. 流式生產數據批次
        while let Some(batch) = stream.next().await {
            batch_sender.send(Ok(Some(batch)))?;
        }
    };

    // 7. 啟動異步任務
    let join_handle = tokio_runtime.spawn(async move {
        consume_stream.await.unwrap_or_else(|err| { 
            /* 錯誤處理 */
        });
    });

    // 8. 返回運行時實例
    Ok(Self { ... })
}

接下來看看 nextBatch 的實現,naxtBatch 實際調用的 NativeExecutionRuntime 的 next_bacht 函數,主要利用 arrow 實現了零拷貝的數據傳輸:

pub fn next_batch(&self) -> bool {
    // 1. 定義內部閉包處理實際數據接收
        let next_batch = || -> Result<bool> {
            match self
                .batch_receiver
                .recv()
                .or_else(|err| df_execution_err!("receive batch error: {err}"))??
            {
                Some(batch) => {
                    // 2. 轉換Arrow數據結構
                    let struct_array = StructArray::from(batch);
                    let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
                    // 3. 通過JNI調用Java層方法導入數據
                    jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
                        .importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
                    )?;
                    Ok(true)
                }
                None => Ok(false),
            }
        };
         // 4. 執行並處理最終結果
        match next_batch() {
            Ok(ret) => return ret,
            Err(err) => {
                // 5. 錯誤回調Java層
                let _ = set_error(
                    &self.native_wrapper,
                    &format!("poll record batch error: {err}"),
                    None,
                );
                return false;
            }
        }
    }

結尾#

以上就是大致的 blaze 啟動流程,接下來就是深入了解各個算子的實現方式了,有空的話後續我也會在更新幾篇源碼閱讀,這裡先占個坑(
這裡也提供一個 ai 的一圖流流程總結:

┌───────────────────────┐
Spark Driver
│  (Java/Scala 端)       │
└──────────┬────────────┘
1. 生成Protobuf格式執行計劃

┌───────────────────────┐
JNI 接口層             │
- 透過FFI跨語言調用     │
- 傳遞序列化的二進制數據 │
└──────────┬────────────┘
2. 反序列化為PhysicalPlanNode

┌───────────────────────┐
TryInto::try_into()   │
- 核心轉換入口         │  
└──────────┬────────────┘ 
3. 遞歸處理節點

┌───────────────────────┐
│ ExecutionPlan構建      │
- 通過match處理30+節點類型 │
- 例如:               │
ProjectionExec
FilterExec
HashJoinExec
└──────────┬────────────┘
4. 構建物理運算符樹

┌───────────────────────┐
│ 執行引擎               │
- 調用execute()方法    │
- 分佈式任務調度        │
└───────────────────────┘
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。