首先介紹一下 blaze,blaze 是快手自研的基於 Rust 語言和 DataFusion 框架開發的 Spark 向量化執行引擎,旨在通過本機矢量化執行技術來加速 Spark SQL 的查詢處理。為什麼考慮深入了解 blaze,首先是因為當前相對成熟的開源 spark 向量化引擎項目較少(指開箱即用和存在生產運行案例),比較熱門就只有 gluten 和 blaze;此外 blaze 基於 datafusion,採用 rust 實現核心向量化組件,比起 gluten 基於 velox (通過 C++ 實現),上手成本相對較低,最近剛好在學 rust,所以自然而然地對 blaze 的興趣更大了。
本文主要是介紹 blaze 的整體架構和啟動流程,其中主要會結合源碼進行分析。由於本人也還在學習中,如果內容存在纰漏,也歡迎指出並一起討論。
整體架構#
blaze 本質是一個 spark 擴展,實現加速的核心原理是通過替換算子為 native 算子從而利用 datafusion 進行加速,同時通過列式計算和高效的內存通信,使得在不侵入業務作業邏輯的前提下實現性能的較大提升。
以下是 spark 原生和 blaze 的架構實現對比:
原生 spark
spark+ blaze
參考 blaze 的項目代碼,可見 blaze 的主要實現包括
- spark extension:一系列替換執行計劃的規則和適配多版本的 spark shim 包
- jni bridge: 一套用於 java 和 rust 的通信機制,基於 pb 和 arrow 實現高效數據傳輸
- native plan: 一系列基於 datafusion 適配的 native 算子
接下來我們主要了解 spark extension 以及 jni bridge 部分代碼,算子方面留給以後再說吧~同時注意本文中涉及的代碼均來自 branch v4.0.1
啟動流程#
blaze 通過通用的 spark 插件形式注入,啟動的入口自然在繼承自 SparkSessionExtensions 的 BlazeSparkSessionExtension 中,通過擴展註冊列式處理規則,執行算子替換策略。其中核心執行代碼如下:
//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions
override def preColumnarTransitions: Rule[SparkPlan] = {
new Rule[SparkPlan] {
override def apply(sparkPlan: SparkPlan): SparkPlan = {
// 檢查Blaze是否啟用
if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
// 生成轉換策略
BlazeConvertStrategy.apply(sparkPlan)
// 遞歸轉換執行計劃
val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
// 執行特定算子的後處理
Shims.get.postTransform(transformed, sparkSession.sparkContext)
}
}
}
在 BlazeConvertStrategy 中我們可以看到主要的作用是給節點打標籤從而判斷是否需要轉換 native,
目前標籤類型包含:
// 標記節點是否可轉換為Blaze原生執行
// 示例:在轉換測試後更新標記,決定是否保留轉換結果
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")
// 標記是否需要回退原格式
// 示例:當遇到無法完全轉換的操作時設置此標記
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")
// 核心轉換策略標記,包含三種狀態:
// - Default: 未決狀態
// - AlwaysConvert: 強制轉換(如文件掃描)
// - NeverConvert: 禁止轉換(如低效聚合場景)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")
// 標記子節點是否需要保持排序特性
// 示例:SortExec節點會清除此標記以避免冗餘排序
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")
// 用於Join操作標記較小數據側
// 示例:BroadcastHashJoinExec中決定廣播哪一側數據
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")
通過 BlazeConverters.convertSparkPlanRecursively 會把節點盡可能轉成 native plan 並確定需要回退的節點,之後會借由 spark 的分佈式能力執行 native plan。
下面分別展示一下 nativePlan 節點的構造和對應 nativeRDD 的構造,大家可以和 spark 原生的構建做一個參照:
NativePlan 對應 SparkPlan
trait NativeSupports extends SparkPlan {
// 實際執行native的方法,最後會返回nativeRDD
protected def doExecuteNative(): NativeRDD
protected override def doExecute(): RDD[InternalRow] = doExecuteNative()
def executeNative(): NativeRDD = executeQuery {
doExecuteNative()
}
// hack了一個是否全量shuffleRead的函數,目前看默認為true,是一個未完成的feature
def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}
NativeRDD 對應 RDD
class NativeRDD(
// 保持SparkContext的transient特性防止序列化
@transient private val rddSparkContext: SparkContext,
val metrics: MetricNode,
private val rddPartitions: Array[Partition],
private val rddDependencies: Seq[Dependency[_]],
private val rddShuffleReadFull: Boolean,
//對應的native plan,這裡的pysicalPlanNode實際是用pb定義的類,是datafusion執行計劃算子,是jni數據交互的核心
val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
val friendlyName: String = null)
extends RDD[InternalRow](rddSparkContext, rddDependencies)
with Logging {
if (friendlyName != null) {
setName(friendlyName)
}
def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)
override protected def getPartitions: Array[Partition] = rddPartitions
override protected def getDependencies: Seq[Dependency[_]] = rddDependencies
//執行入口,通過調用JNI執行native函數
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val computingNativePlan = nativePlan(split, context)
NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
}
}
既然到了 executeNativePlan 這一步,blaze 具體是如何實現 jvm 和 rust runtime 通信的呢?
jni 通信機制#
java 側實現#
我們首先看一下 NativeHelper.executeNativePlan 函數:
def executeNativePlan(
nativePlan: PhysicalPlanNode,
metrics: MetricNode,
partition: Partition,
context: Option[TaskContext]): Iterator[InternalRow] = {
if (partition.index == 0 && metrics != null && context.nonEmpty) {
metrics.foreach(_.add("stage_id", context.get.stageId()))
}
if (nativePlan == null) {
return Iterator.empty
}
BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
}
BlazeCallNativeWrapper 包裝了 jni 交互的細節,其中重要的幾個步驟:
- 第一次初始化時啟動 native runtime,首先確保加載 libblaze.so 從而獲取 native 引擎運行庫,這部分其實就是 blaze 核心的 native-engine 部分;然後通過調用 native 函數 JniBridge.callNative () 來啟動 runtime 並持有對應的指針,用於後續數據訪問
class BlazeCallNativeWrapper(...) {
// 確保本地庫加載
BlazeCallNativeWrapper.initNative()
// 初始化Native運行時
private var nativeRuntimePtr = JniBridge.callNative(...)
// ...
}
- BlazeCallNativeWrapper 會持有一個數據迭代器,當調用 hasNext 的時候才會去 native runtime 拉取就緒的數據。觸發函數是 JniBridge.nextBatch ()。其中 native runtime 會通過調用 scala 函數 importBatch () 將 arrow 格式的數據轉換成 scala ArrayBuffer 類,保存在 batchRows 中。
可以學習一下用 arrow 進行數據交換的實現:
//Arrow 數據元信息導入(native runtime回調)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
schema = ArrowUtils.fromArrowSchema(arrowSchema)
toUnsafe = UnsafeProjection.create(schema)
}
}
}
// Arrow數據導入(native runtime回調)
protected def importBatch(ffiArrayPtr: Long): Unit = {
if (nativeRuntimePtr == 0) {
throw new RuntimeException("Native runtime is finalized")
}
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
Using.resources(
ArrowArray.wrap(ffiArrayPtr),
VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
val batch = ColumnarHelper.rootAsBatch(root)
batchRows.append(
ColumnarHelper
//列行轉換
.batchAsRowIter(batch)
// toUnsafe復用避免重複計算
.map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
.toSeq: _*)
}
}
}
rust 側實現#
接下來我們就要查看 native runtime 的細節,主要關注 callNative 和 nextBatch 的實現。
callNative 方法在 rust 中的調用在 execu.rs 的 Java_org_apache_spark_sql_blaze_JniBridge_callNative 里:
核心做了這些工作
- 初始化 jni java classes
- 初始化 datafusion 的 session 上下文
- 初始化執行運行時 NativeExecutionRuntime 用於監聽運行過程和處理執行計劃
可以看一下完整實現,官方有註釋非常好理解:
#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
env: JNIEnv,
_: JClass,
executor_memory_overhead: i64,
native_wrapper: JObject,
) -> i64 (
handle_unwinded_scope(|| -> Result<i64> {
static SESSION: OnceCell<SessionContext> = OnceCell::new();
static INIT: OnceCell<()> = OnceCell::new();
// 單例初始化一次
INIT.get_or_try_init(|| {
// logging is not initialized at this moment
eprintln!("------ initializing blaze native environment ------");
init_logging();
// init jni java classes
log::info!("initializing JNI bridge");
JavaClasses::init(&env);
// init datafusion session context
log::info!("initializing datafusion session");
SESSION.get_or_try_init(|| {
let max_memory = executor_memory_overhead as usize;
let memory_fraction = conf::MEMORY_FRACTION.value()?;
let batch_size = conf::BATCH_SIZE.value()? as usize;
MemManager::init((max_memory as f64 * memory_fraction) as usize);
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime_config =
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session = SessionContext::new_with_config_rt(session_config, runtime);
Ok::<_, DataFusionError>(session)
})?;
Ok::<_, DataFusionError>(())
})?;
let native_wrapper = jni_new_global_ref!(native_wrapper)?;
// create execution runtime
let runtime = Box::new(NativeExecutionRuntime::start(
native_wrapper,
SESSION.get().unwrap().task_ctx(),
)?);
// returns runtime raw pointer
Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
})
}
在 rt.rs 中可以看到初始化 NativeExecutionRuntime 時將 protobuf 定義的執行計劃全部反序列化轉換成 datafusion 的 ExecutionPlan 結構,然後使用 datafusion 執行整個執行計劃,datafusion 會使用 tokio 發起異步執行操作,依次迭代觸發各個算子的 compute。
代碼很長,我簡短了一些:
pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
// 1. 從JNI獲取並解碼任務定義
let raw_task_definition = jni_call!(...);
let task_definition = TaskDefinition::decode(...);
// 2. 創建DataFusion執行計劃
let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;
// 3. 初始化Tokio多線程運行時
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| { /* 設置線程本地上下文 */ })
.build()?;
// 4. 創建數據生產通道
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
// 5. 異步執行數據流
let consume_stream = async move {
let mut stream = exec_ctx.execute(&execution_plan)?;
jni_call!(...importSchema(...)); // 導出schema到Java
// 6. 流式生產數據批次
while let Some(batch) = stream.next().await {
batch_sender.send(Ok(Some(batch)))?;
}
};
// 7. 啟動異步任務
let join_handle = tokio_runtime.spawn(async move {
consume_stream.await.unwrap_or_else(|err| {
/* 錯誤處理 */
});
});
// 8. 返回運行時實例
Ok(Self { ... })
}
接下來看看 nextBatch 的實現,naxtBatch 實際調用的 NativeExecutionRuntime 的 next_bacht 函數,主要利用 arrow 實現了零拷貝的數據傳輸:
pub fn next_batch(&self) -> bool {
// 1. 定義內部閉包處理實際數據接收
let next_batch = || -> Result<bool> {
match self
.batch_receiver
.recv()
.or_else(|err| df_execution_err!("receive batch error: {err}"))??
{
Some(batch) => {
// 2. 轉換Arrow數據結構
let struct_array = StructArray::from(batch);
let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
// 3. 通過JNI調用Java層方法導入數據
jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
.importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
)?;
Ok(true)
}
None => Ok(false),
}
};
// 4. 執行並處理最終結果
match next_batch() {
Ok(ret) => return ret,
Err(err) => {
// 5. 錯誤回調Java層
let _ = set_error(
&self.native_wrapper,
&format!("poll record batch error: {err}"),
None,
);
return false;
}
}
}
結尾#
以上就是大致的 blaze 啟動流程,接下來就是深入了解各個算子的實現方式了,有空的話後續我也會在更新幾篇源碼閱讀,這裡先占個坑(
這裡也提供一個 ai 的一圖流流程總結:
┌───────────────────────┐
│ Spark Driver │
│ (Java/Scala 端) │
└──────────┬────────────┘
│ 1. 生成Protobuf格式執行計劃
▼
┌───────────────────────┐
│ JNI 接口層 │
│ - 透過FFI跨語言調用 │
│ - 傳遞序列化的二進制數據 │
└──────────┬────────────┘
│ 2. 反序列化為PhysicalPlanNode
▼
┌───────────────────────┐
│ TryInto::try_into() │
│ - 核心轉換入口 │
└──────────┬────────────┘
│ 3. 遞歸處理節點
▼
┌───────────────────────┐
│ ExecutionPlan構建 │
│ - 通過match處理30+節點類型 │
│ - 例如: │
│ ProjectionExec │
│ FilterExec │
│ HashJoinExec │
└──────────┬────────────┘
│ 4. 構建物理運算符樹
▼
┌───────────────────────┐
│ 執行引擎 │
│ - 調用execute()方法 │
│ - 分佈式任務調度 │
└───────────────────────┘