banner
bladedragon

bladedragon

blaze execution flow source code reading

First, let me introduce Blaze. Blaze is a Spark vectorized execution engine developed by Kuaishou based on the Rust language and the DataFusion framework, aimed at accelerating Spark SQL query processing through native vectorized execution technology. Why consider delving into Blaze? Firstly, there are currently relatively few mature open-source Spark vectorized engine projects (referring to those that are ready to use and have production running cases), with only Gluten and Blaze being quite popular. Additionally, Blaze is based on DataFusion and implements core vectorized components in Rust, which has a relatively lower entry cost compared to Gluten, which is based on Velox (implemented in C++). I happen to be learning Rust recently, so naturally, my interest in Blaze has increased.

This article mainly introduces the overall architecture and startup process of Blaze, with a focus on analyzing the source code. Since I am still learning, if there are any inaccuracies in the content, I welcome feedback and discussion.

Overall Architecture#

Blaze is essentially a Spark extension, and the core principle of acceleration is to replace operators with native operators to leverage DataFusion for acceleration. At the same time, through columnar computation and efficient memory communication, it achieves significant performance improvements without intruding on business job logic.

Below is a comparison of the architecture implementations of native Spark and Blaze:

Native Spark

image

Spark + Blaze

image

Referring to the Blaze project code, we can see that the main implementations of Blaze include:

  • Spark extension: A series of rules for replacing execution plans and adapting to multiple versions of Spark shim packages.
  • JNI bridge: A set of communication mechanisms for Java and Rust, based on Protocol Buffers and Arrow for efficient data transfer.
  • Native plan: A series of native operators adapted based on DataFusion.

Next, we will mainly understand the Spark extension and JNI bridge parts of the code, leaving the operator aspects for later discussion. Please note that the code mentioned in this article comes from branch v4.0.1.

Startup Process#

Blaze is injected through a general Spark plugin form, and the entry point is naturally in the BlazeSparkSessionExtension, which inherits from SparkSessionExtensions. It registers columnar processing rules and executes operator replacement strategies. The core execution code is as follows:

//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions

override def preColumnarTransitions: Rule[SparkPlan] = {
  new Rule[SparkPlan] {
    override def apply(sparkPlan: SparkPlan): SparkPlan = {
      // Check if Blaze is enabled
      if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
      
      // Generate conversion strategy
      BlazeConvertStrategy.apply(sparkPlan)
      
      // Recursively convert execution plan
      val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
      
      // Execute post-processing for specific operators
      Shims.get.postTransform(transformed, sparkSession.sparkContext)
    }
  }
}

In BlazeConvertStrategy, we can see that its main function is to tag nodes to determine whether they need to be converted to native.

The current tag types include:

// Mark whether the node can be converted to Blaze native execution
// Example: Update the tag after conversion testing to decide whether to keep the conversion result
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")

// Mark whether to fall back to the original format
// Example: Set this tag when encountering operations that cannot be fully converted
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")

// Core conversion strategy tag, includes three states:
// - Default: undecided state
// - AlwaysConvert: forced conversion (e.g., file scan)
// - NeverConvert: prohibited conversion (e.g., inefficient aggregation scenarios)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")

// Mark whether child nodes need to maintain sorting characteristics
// Example: SortExec nodes will clear this tag to avoid redundant sorting
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")

// Used to mark the smaller side of a Join operation
// Example: In BroadcastHashJoinExec, decide which side of data to broadcast
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")

Through BlazeConverters.convertSparkPlanRecursively, nodes will be converted to a native plan as much as possible, and nodes that need to fall back will be determined. After that, the native plan will be executed using Spark's distributed capabilities.

Next, let's show the construction of the NativePlan node and the corresponding NativeRDD construction, which can be compared with the native Spark construction:

NativePlan Corresponding to SparkPlan

trait NativeSupports extends SparkPlan {
  // The actual method to execute native, which will ultimately return nativeRDD
  protected def doExecuteNative(): NativeRDD
  
  protected override def doExecute(): RDD[InternalRow] = doExecuteNative()

  def executeNative(): NativeRDD = executeQuery {
    doExecuteNative()
  }

  // Hack a function to check if it's a full shuffleRead, currently defaults to true, an unfinished feature
  def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}

NativeRDD Corresponding to RDD

class NativeRDD(
    // Maintain the transient property of SparkContext to prevent serialization
    @transient private val rddSparkContext: SparkContext,
    val metrics: MetricNode,
    private val rddPartitions: Array[Partition],
    private val rddDependencies: Seq[Dependency[_]],
    private val rddShuffleReadFull: Boolean,
    // Corresponding native plan, here the physicalPlanNode is actually a class defined by protobuf, which is the core of the data interaction in DataFusion execution plan operators
    val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
    val friendlyName: String = null)
    extends RDD[InternalRow](rddSparkContext, rddDependencies)
    with Logging {

  if (friendlyName != null) {
    setName(friendlyName)
  }

  def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
  Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)

  override protected def getPartitions: Array[Partition] = rddPartitions
  override protected def getDependencies: Seq[Dependency[_]] = rddDependencies

  // Execution entry point, calls JNI to execute native functions
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val computingNativePlan = nativePlan(split, context)
    NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
  }
}

Now that we have reached the executeNativePlan step, how does Blaze specifically implement communication between the JVM and the Rust runtime?

JNI Communication Mechanism#

Java Side Implementation#

First, let's look at the NativeHelper.executeNativePlan function:

  def executeNativePlan(
      nativePlan: PhysicalPlanNode,
      metrics: MetricNode,
      partition: Partition,
      context: Option[TaskContext]): Iterator[InternalRow] = {

    if (partition.index == 0 && metrics != null && context.nonEmpty) {
      metrics.foreach(_.add("stage_id", context.get.stageId()))
    }
    if (nativePlan == null) {
      return Iterator.empty
    }
    BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
  }

BlazeCallNativeWrapper wraps the details of JNI interaction, with several important steps:

  1. On the first initialization, it starts the native runtime, ensuring that libblaze.so is loaded to obtain the native engine runtime library. This part is actually the core native engine of Blaze. Then, it calls the native function JniBridge.callNative() to start the runtime and hold the corresponding pointer for subsequent data access.
class BlazeCallNativeWrapper(...) {
  // Ensure local library is loaded  
  BlazeCallNativeWrapper.initNative() 
  // Initialize Native runtime  
  private var nativeRuntimePtr = JniBridge.callNative(...) 
  // ...
}
  1. BlazeCallNativeWrapper holds a data iterator, which only pulls ready data from the native runtime when hasNext is called. The triggering function is JniBridge.nextBatch(). The native runtime will convert the Arrow format data into a Scala ArrayBuffer class by calling the Scala function importBatch(), storing it in batchRows.

You can learn about the implementation of data exchange using Arrow:

// Arrow data metadata import (native runtime callback)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
  Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
    Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
      arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
      schema = ArrowUtils.fromArrowSchema(arrowSchema)
      toUnsafe = UnsafeProjection.create(schema)
    }
  }
}
// Arrow data import (native runtime callback)
 protected def importBatch(ffiArrayPtr: Long): Unit = {
    if (nativeRuntimePtr == 0) {
      throw new RuntimeException("Native runtime is finalized")
    }

    Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
      Using.resources(
        ArrowArray.wrap(ffiArrayPtr),
        VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
        Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
        val batch = ColumnarHelper.rootAsBatch(root)

        batchRows.append(
          ColumnarHelper
            // Column-row conversion
            .batchAsRowIter(batch)
            // Reuse toUnsafe to avoid redundant computation
            .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
            .toSeq: _*)
      }
    }
  }

Rust Side Implementation#

Next, we will look at the details of the native runtime, focusing on the implementations of callNative and nextBatch.

The callNative method in Rust is called in execu.rs at Java_org_apache_spark_sql_blaze_JniBridge_callNative:
The core tasks include:

  1. Initializing JNI Java classes.
  2. Initializing the DataFusion session context.
  3. Initializing the execution runtime NativeExecutionRuntime to monitor the execution process and handle the execution plan.

You can look at the complete implementation, which is well-commented and easy to understand:

#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
    env: JNIEnv,
    _: JClass,
    executor_memory_overhead: i64,
    native_wrapper: JObject,
) -> i64 {
    handle_unwinded_scope(|| -> Result<i64> {
        static SESSION: OnceCell<SessionContext> = OnceCell::new();
        static INIT: OnceCell<()> = OnceCell::new();
        // Singleton initialization once 
        INIT.get_or_try_init(|| {
            // Logging is not initialized at this moment
            eprintln!("------ initializing blaze native environment ------");
            init_logging();

            // Init JNI Java classes
            log::info!("initializing JNI bridge");
            JavaClasses::init(&env);

            // Init DataFusion session context
            log::info!("initializing datafusion session");
            SESSION.get_or_try_init(|| {
                let max_memory = executor_memory_overhead as usize;
                let memory_fraction = conf::MEMORY_FRACTION.value()?;
                let batch_size = conf::BATCH_SIZE.value()? as usize;
                MemManager::init((max_memory as f64 * memory_fraction) as usize);

                let session_config = SessionConfig::new().with_batch_size(batch_size);
                let runtime_config =
                    RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
                let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
                let session = SessionContext::new_with_config_rt(session_config, runtime);
                Ok::<_, DataFusionError>(session)
            })?;
            Ok::<_, DataFusionError>(())
        })?;
        let native_wrapper = jni_new_global_ref!(native_wrapper)?;

        // Create execution runtime
        let runtime = Box::new(NativeExecutionRuntime::start(
            native_wrapper,
            SESSION.get().unwrap().task_ctx(),
        )?);

        // Returns runtime raw pointer
        Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
    })
}

In rt.rs, we can see that when initializing NativeExecutionRuntime, all protobuf-defined execution plans are deserialized and converted into the DataFusion ExecutionPlan structure. Then, the entire execution plan is executed using DataFusion, which will use Tokio to initiate asynchronous execution operations, triggering the compute of each operator in turn. The code is lengthy, so I have shortened it:

pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
    // 1. Get and decode task definition from JNI
    let raw_task_definition = jni_call!(...);
    let task_definition = TaskDefinition::decode(...);

    // 2. Create DataFusion execution plan
    let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;

    // 3. Initialize Tokio multi-threaded runtime
    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
        .on_thread_start(|| { /* Set thread-local context */ })
        .build()?;

    // 4. Create data production channel
    let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);

    // 5. Asynchronously execute data stream
    let consume_stream = async move {
        let mut stream = exec_ctx.execute(&execution_plan)?;
        jni_call!(...importSchema(...)); // Export schema to Java
        
        // 6. Stream produce data batches
        while let Some(batch) = stream.next().await {
            batch_sender.send(Ok(Some(batch)))?;
        }
    };

    // 7. Start asynchronous task
    let join_handle = tokio_runtime.spawn(async move {
        consume_stream.await.unwrap_or_else(|err| { 
            /* Error handling */
        });
    });

    // 8. Return runtime instance
    Ok(Self { ... })
}

Next, let's look at the implementation of nextBatch, which actually calls the next_batch function of NativeExecutionRuntime, primarily utilizing Arrow to achieve zero-copy data transfer:

pub fn next_batch(&self) -> bool {
    // 1. Define an internal closure to handle actual data reception
    let next_batch = || -> Result<bool> {
        match self
            .batch_receiver
            .recv()
            .or_else(|err| df_execution_err!("receive batch error: {err}"))??
        {
            Some(batch) => {
                // 2. Convert Arrow data structure
                let struct_array = StructArray::from(batch);
                let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
                // 3. Call Java layer method to import data via JNI
                jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
                    .importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
                )?;
                Ok(true)
            }
            None => Ok(false),
        }
    };
    // 4. Execute and process final result
    match next_batch() {
        Ok(ret) => return ret,
        Err(err) => {
            // 5. Error callback to Java layer
            let _ = set_error(
                &self.native_wrapper,
                &format!("poll record batch error: {err}"),
                None,
            );
            return false;
        }
    }
}

Conclusion#

The above is a rough overview of the Blaze startup process. Next, we will delve into the implementation methods of various operators. If I have time, I will also update a few more articles on source code reading later; for now, I will leave this as a placeholder. Here is also a flowchart summary provided by AI:

┌───────────────────────┐
Spark Driver
│  (Java/Scala Side)     │
└──────────┬────────────┘
1. Generate Protobuf format execution plan

┌───────────────────────┐
JNI Interface Layer
- Cross-language calls via FFI
- Pass serialized binary data     │
└──────────┬────────────┘
2. Deserialize to PhysicalPlanNode

┌───────────────────────┐
TryInto::try_into()   │
- Core conversion entry point         │  
└──────────┬────────────┘ 
3. Recursively process nodes

┌───────────────────────┐
Build ExecutionPlan
- Match and process 30+ node types │
- For example:
ProjectionExec
FilterExec
HashJoinExec
└──────────┬────────────┘
4. Build physical operator tree

┌───────────────────────┐
Execution Engine
- Call execute() method    │
- Distributed task scheduling  │
└───────────────────────┘
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.