First, let me introduce Blaze. Blaze is a Spark vectorized execution engine developed by Kuaishou based on the Rust language and the DataFusion framework, aimed at accelerating Spark SQL query processing through native vectorized execution technology. Why consider delving into Blaze? Firstly, there are currently relatively few mature open-source Spark vectorized engine projects (referring to those that are ready to use and have production running cases), with only Gluten and Blaze being quite popular. Additionally, Blaze is based on DataFusion and implements core vectorized components in Rust, which has a relatively lower entry cost compared to Gluten, which is based on Velox (implemented in C++). I happen to be learning Rust recently, so naturally, my interest in Blaze has increased.
This article mainly introduces the overall architecture and startup process of Blaze, with a focus on analyzing the source code. Since I am still learning, if there are any inaccuracies in the content, I welcome feedback and discussion.
Overall Architecture#
Blaze is essentially a Spark extension, and the core principle of acceleration is to replace operators with native operators to leverage DataFusion for acceleration. At the same time, through columnar computation and efficient memory communication, it achieves significant performance improvements without intruding on business job logic.
Below is a comparison of the architecture implementations of native Spark and Blaze:
Native Spark
Spark + Blaze
Referring to the Blaze project code, we can see that the main implementations of Blaze include:
- Spark extension: A series of rules for replacing execution plans and adapting to multiple versions of Spark shim packages.
- JNI bridge: A set of communication mechanisms for Java and Rust, based on Protocol Buffers and Arrow for efficient data transfer.
- Native plan: A series of native operators adapted based on DataFusion.
Next, we will mainly understand the Spark extension and JNI bridge parts of the code, leaving the operator aspects for later discussion. Please note that the code mentioned in this article comes from branch v4.0.1.
Startup Process#
Blaze is injected through a general Spark plugin form, and the entry point is naturally in the BlazeSparkSessionExtension
, which inherits from SparkSessionExtensions
. It registers columnar processing rules and executes operator replacement strategies. The core execution code is as follows:
//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions
override def preColumnarTransitions: Rule[SparkPlan] = {
new Rule[SparkPlan] {
override def apply(sparkPlan: SparkPlan): SparkPlan = {
// Check if Blaze is enabled
if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
// Generate conversion strategy
BlazeConvertStrategy.apply(sparkPlan)
// Recursively convert execution plan
val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
// Execute post-processing for specific operators
Shims.get.postTransform(transformed, sparkSession.sparkContext)
}
}
}
In BlazeConvertStrategy
, we can see that its main function is to tag nodes to determine whether they need to be converted to native.
The current tag types include:
// Mark whether the node can be converted to Blaze native execution
// Example: Update the tag after conversion testing to decide whether to keep the conversion result
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")
// Mark whether to fall back to the original format
// Example: Set this tag when encountering operations that cannot be fully converted
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")
// Core conversion strategy tag, includes three states:
// - Default: undecided state
// - AlwaysConvert: forced conversion (e.g., file scan)
// - NeverConvert: prohibited conversion (e.g., inefficient aggregation scenarios)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")
// Mark whether child nodes need to maintain sorting characteristics
// Example: SortExec nodes will clear this tag to avoid redundant sorting
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")
// Used to mark the smaller side of a Join operation
// Example: In BroadcastHashJoinExec, decide which side of data to broadcast
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")
Through BlazeConverters.convertSparkPlanRecursively
, nodes will be converted to a native plan as much as possible, and nodes that need to fall back will be determined. After that, the native plan will be executed using Spark's distributed capabilities.
Next, let's show the construction of the NativePlan
node and the corresponding NativeRDD
construction, which can be compared with the native Spark construction:
NativePlan Corresponding to SparkPlan
trait NativeSupports extends SparkPlan {
// The actual method to execute native, which will ultimately return nativeRDD
protected def doExecuteNative(): NativeRDD
protected override def doExecute(): RDD[InternalRow] = doExecuteNative()
def executeNative(): NativeRDD = executeQuery {
doExecuteNative()
}
// Hack a function to check if it's a full shuffleRead, currently defaults to true, an unfinished feature
def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}
NativeRDD Corresponding to RDD
class NativeRDD(
// Maintain the transient property of SparkContext to prevent serialization
@transient private val rddSparkContext: SparkContext,
val metrics: MetricNode,
private val rddPartitions: Array[Partition],
private val rddDependencies: Seq[Dependency[_]],
private val rddShuffleReadFull: Boolean,
// Corresponding native plan, here the physicalPlanNode is actually a class defined by protobuf, which is the core of the data interaction in DataFusion execution plan operators
val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
val friendlyName: String = null)
extends RDD[InternalRow](rddSparkContext, rddDependencies)
with Logging {
if (friendlyName != null) {
setName(friendlyName)
}
def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)
override protected def getPartitions: Array[Partition] = rddPartitions
override protected def getDependencies: Seq[Dependency[_]] = rddDependencies
// Execution entry point, calls JNI to execute native functions
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val computingNativePlan = nativePlan(split, context)
NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
}
}
Now that we have reached the executeNativePlan
step, how does Blaze specifically implement communication between the JVM and the Rust runtime?
JNI Communication Mechanism#
Java Side Implementation#
First, let's look at the NativeHelper.executeNativePlan
function:
def executeNativePlan(
nativePlan: PhysicalPlanNode,
metrics: MetricNode,
partition: Partition,
context: Option[TaskContext]): Iterator[InternalRow] = {
if (partition.index == 0 && metrics != null && context.nonEmpty) {
metrics.foreach(_.add("stage_id", context.get.stageId()))
}
if (nativePlan == null) {
return Iterator.empty
}
BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
}
BlazeCallNativeWrapper
wraps the details of JNI interaction, with several important steps:
- On the first initialization, it starts the native runtime, ensuring that
libblaze.so
is loaded to obtain the native engine runtime library. This part is actually the core native engine of Blaze. Then, it calls the native functionJniBridge.callNative()
to start the runtime and hold the corresponding pointer for subsequent data access.
class BlazeCallNativeWrapper(...) {
// Ensure local library is loaded
BlazeCallNativeWrapper.initNative()
// Initialize Native runtime
private var nativeRuntimePtr = JniBridge.callNative(...)
// ...
}
BlazeCallNativeWrapper
holds a data iterator, which only pulls ready data from the native runtime whenhasNext
is called. The triggering function isJniBridge.nextBatch()
. The native runtime will convert the Arrow format data into a ScalaArrayBuffer
class by calling the Scala functionimportBatch()
, storing it inbatchRows
.
You can learn about the implementation of data exchange using Arrow:
// Arrow data metadata import (native runtime callback)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
schema = ArrowUtils.fromArrowSchema(arrowSchema)
toUnsafe = UnsafeProjection.create(schema)
}
}
}
// Arrow data import (native runtime callback)
protected def importBatch(ffiArrayPtr: Long): Unit = {
if (nativeRuntimePtr == 0) {
throw new RuntimeException("Native runtime is finalized")
}
Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
Using.resources(
ArrowArray.wrap(ffiArrayPtr),
VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
val batch = ColumnarHelper.rootAsBatch(root)
batchRows.append(
ColumnarHelper
// Column-row conversion
.batchAsRowIter(batch)
// Reuse toUnsafe to avoid redundant computation
.map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
.toSeq: _*)
}
}
}
Rust Side Implementation#
Next, we will look at the details of the native runtime, focusing on the implementations of callNative
and nextBatch
.
The callNative
method in Rust is called in execu.rs
at Java_org_apache_spark_sql_blaze_JniBridge_callNative
:
The core tasks include:
- Initializing JNI Java classes.
- Initializing the DataFusion session context.
- Initializing the execution runtime
NativeExecutionRuntime
to monitor the execution process and handle the execution plan.
You can look at the complete implementation, which is well-commented and easy to understand:
#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
env: JNIEnv,
_: JClass,
executor_memory_overhead: i64,
native_wrapper: JObject,
) -> i64 {
handle_unwinded_scope(|| -> Result<i64> {
static SESSION: OnceCell<SessionContext> = OnceCell::new();
static INIT: OnceCell<()> = OnceCell::new();
// Singleton initialization once
INIT.get_or_try_init(|| {
// Logging is not initialized at this moment
eprintln!("------ initializing blaze native environment ------");
init_logging();
// Init JNI Java classes
log::info!("initializing JNI bridge");
JavaClasses::init(&env);
// Init DataFusion session context
log::info!("initializing datafusion session");
SESSION.get_or_try_init(|| {
let max_memory = executor_memory_overhead as usize;
let memory_fraction = conf::MEMORY_FRACTION.value()?;
let batch_size = conf::BATCH_SIZE.value()? as usize;
MemManager::init((max_memory as f64 * memory_fraction) as usize);
let session_config = SessionConfig::new().with_batch_size(batch_size);
let runtime_config =
RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session = SessionContext::new_with_config_rt(session_config, runtime);
Ok::<_, DataFusionError>(session)
})?;
Ok::<_, DataFusionError>(())
})?;
let native_wrapper = jni_new_global_ref!(native_wrapper)?;
// Create execution runtime
let runtime = Box::new(NativeExecutionRuntime::start(
native_wrapper,
SESSION.get().unwrap().task_ctx(),
)?);
// Returns runtime raw pointer
Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
})
}
In rt.rs
, we can see that when initializing NativeExecutionRuntime
, all protobuf-defined execution plans are deserialized and converted into the DataFusion ExecutionPlan
structure. Then, the entire execution plan is executed using DataFusion, which will use Tokio to initiate asynchronous execution operations, triggering the compute of each operator in turn. The code is lengthy, so I have shortened it:
pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
// 1. Get and decode task definition from JNI
let raw_task_definition = jni_call!(...);
let task_definition = TaskDefinition::decode(...);
// 2. Create DataFusion execution plan
let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;
// 3. Initialize Tokio multi-threaded runtime
let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
.on_thread_start(|| { /* Set thread-local context */ })
.build()?;
// 4. Create data production channel
let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
// 5. Asynchronously execute data stream
let consume_stream = async move {
let mut stream = exec_ctx.execute(&execution_plan)?;
jni_call!(...importSchema(...)); // Export schema to Java
// 6. Stream produce data batches
while let Some(batch) = stream.next().await {
batch_sender.send(Ok(Some(batch)))?;
}
};
// 7. Start asynchronous task
let join_handle = tokio_runtime.spawn(async move {
consume_stream.await.unwrap_or_else(|err| {
/* Error handling */
});
});
// 8. Return runtime instance
Ok(Self { ... })
}
Next, let's look at the implementation of nextBatch
, which actually calls the next_batch
function of NativeExecutionRuntime
, primarily utilizing Arrow to achieve zero-copy data transfer:
pub fn next_batch(&self) -> bool {
// 1. Define an internal closure to handle actual data reception
let next_batch = || -> Result<bool> {
match self
.batch_receiver
.recv()
.or_else(|err| df_execution_err!("receive batch error: {err}"))??
{
Some(batch) => {
// 2. Convert Arrow data structure
let struct_array = StructArray::from(batch);
let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
// 3. Call Java layer method to import data via JNI
jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
.importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
)?;
Ok(true)
}
None => Ok(false),
}
};
// 4. Execute and process final result
match next_batch() {
Ok(ret) => return ret,
Err(err) => {
// 5. Error callback to Java layer
let _ = set_error(
&self.native_wrapper,
&format!("poll record batch error: {err}"),
None,
);
return false;
}
}
}
Conclusion#
The above is a rough overview of the Blaze startup process. Next, we will delve into the implementation methods of various operators. If I have time, I will also update a few more articles on source code reading later; for now, I will leave this as a placeholder. Here is also a flowchart summary provided by AI:
┌───────────────────────┐
│ Spark Driver │
│ (Java/Scala Side) │
└──────────┬────────────┘
│ 1. Generate Protobuf format execution plan
▼
┌───────────────────────┐
│ JNI Interface Layer │
│ - Cross-language calls via FFI │
│ - Pass serialized binary data │
└──────────┬────────────┘
│ 2. Deserialize to PhysicalPlanNode
▼
┌───────────────────────┐
│ TryInto::try_into() │
│ - Core conversion entry point │
└──────────┬────────────┘
│ 3. Recursively process nodes
▼
┌───────────────────────┐
│ Build ExecutionPlan │
│ - Match and process 30+ node types │
│ - For example: │
│ ProjectionExec │
│ FilterExec │
│ HashJoinExec │
└──────────┬────────────┘
│ 4. Build physical operator tree
▼
┌───────────────────────┐
│ Execution Engine │
│ - Call execute() method │
│ - Distributed task scheduling │
└───────────────────────┘