banner
bladedragon

bladedragon

blaze 実行フローソースコードの読み取り

まず blaze について紹介します。blaze は快手が独自に開発した、Rust 言語と DataFusion フレームワークに基づく Spark のベクトル化実行エンジンで、ネイティブベクトル化実行技術を通じて Spark SQL のクエリ処理を加速することを目的としています。blaze を深く理解することを考える理由は、現在比較的成熟したオープンソースの Spark ベクトル化エンジンプロジェクトが少ないため(すぐに使えるものや生産運用のケースが存在するものを指します)、人気があるのは gluten と blaze だけです。また、blaze は datafusion に基づいており、Rust でコアのベクトル化コンポーネントを実装しているため、gluten が velox(C++ で実装)に基づいているのに比べて、習得コストが比較的低く、最近ちょうど Rust を学んでいるので、自然と blaze への興味が高まっています。

この記事では、blaze の全体アーキテクチャと起動プロセスを主に紹介し、ソースコードを交えて分析します。私自身もまだ学んでいる途中なので、内容に誤りがあれば指摘していただき、一緒に議論できればと思います。

全体アーキテクチャ#

blaze は本質的に Spark の拡張であり、加速の核心原理は演算子をネイティブ演算子に置き換えることで datafusion を利用して加速することです。また、列指向計算と効率的なメモリ通信を通じて、ビジネス作業ロジックに侵入することなくパフォーマンスを大幅に向上させます。

以下は Spark のネイティブと blaze のアーキテクチャ実装の比較です:

ネイティブ Spark

image

spark + blaze

image

blaze のプロジェクトコードを参照すると、blaze の主な実装には以下が含まれます。

  • spark extension:実行計画を置き換えるための一連のルールと多バージョンの spark shim パッケージに適応するもの
  • jni bridge:Java と Rust の通信メカニズムのセットで、pb と arrow に基づいて効率的なデータ転送を実現
  • native plan:datafusion に適応したネイティブ演算子の一連

次に、主に spark extension と jni bridge の部分コードを理解し、演算子については後で話しましょう。この記事で言及されるコードはすべて branch v4.0.1 からのものです。

起動プロセス#

blaze は一般的な spark プラグイン形式で注入され、起動の入り口は SparkSessionExtensions を継承した BlazeSparkSessionExtension にあります。列指向処理ルールを拡張登録し、演算子置き換え戦略を実行します。核心の実行コードは以下の通りです:

//org.apache.spark.sql.blaze.BlazeColumnarOverrides#preColumnarTransitions

override def preColumnarTransitions: Rule[SparkPlan] = {
  new Rule[SparkPlan] {
    override def apply(sparkPlan: SparkPlan): SparkPlan = {
      // Blazeが有効かどうかをチェック
      if (!sparkPlan.conf.getConf(blazeEnabledKey)) return sparkPlan
      
      // 変換戦略を生成
      BlazeConvertStrategy.apply(sparkPlan)
      
      // 再帰的に実行計画を変換
      val transformed = BlazeConverters.convertSparkPlanRecursively(sparkPlan)
      
      // 特定の演算子の後処理を実行
      Shims.get.postTransform(transformed, sparkSession.sparkContext)
    }
  }
}

BlazeConvertStrategy では、ノードにタグを付けてネイティブに変換する必要があるかどうかを判断することが主な役割です。

現在のタグタイプには以下が含まれます:

// ノードがBlazeのネイティブ実行に変換可能かどうかを示すタグ
// 例:変換テスト後にタグを更新し、変換結果を保持するかどうかを決定
val convertibleTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertible")

// 元の形式に戻す必要があるかどうかを示すタグ
// 例:完全に変換できない操作に遭遇したときにこのタグを設定
val convertToNonNativeTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.convertToNonNative")

// 核心の変換戦略タグ、三つの状態を含む:
// - Default: 未決状態
// - AlwaysConvert: 強制変換(ファイルスキャンなど)
// - NeverConvert: 変換禁止(非効率な集約シナリオなど)
val convertStrategyTag: TreeNodeTag[ConvertStrategy] = TreeNodeTag("blaze.convert.strategy")

// 子ノードが順序特性を保持する必要があるかどうかを示すタグ
// 例:SortExecノードは冗長なソートを避けるためにこのタグをクリアします
val childOrderingRequiredTag: TreeNodeTag[Boolean] = TreeNodeTag("blaze.child.ordering.required")

// Join操作で小さいデータ側を示すためのタグ
// 例:BroadcastHashJoinExecでどちら側のデータをブロードキャストするかを決定
val joinSmallerSideTag: TreeNodeTag[BuildSide] = TreeNodeTag("blaze.join.smallerSide")

BlazeConverters.convertSparkPlanRecursively を通じて、ノードを可能な限りネイティブプランに変換し、戻す必要があるノードを特定します。その後、Spark の分散能力を借りてネイティブプランを実行します。

以下に、nativePlan ノードの構築と対応する nativeRDD の構築を示します。Spark のネイティブの構築と比較できます:
NativePlan 対応 SparkPlan

trait NativeSupports extends SparkPlan {
  // 実際にネイティブを実行するメソッド、最終的にnativeRDDを返します
  protected def doExecuteNative(): NativeRDD
  
  protected override def doExecute(): RDD[InternalRow] = doExecuteNative()

  def executeNative(): NativeRDD = executeQuery {
    doExecuteNative()
  }

  // 全量shuffleReadの関数をハックしました。現在はデフォルトでtrueで、未完成の機能です
  def shuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this.doExecuteNative())
}

NativeRDD 対応 RDD

class NativeRDD(
    // SparkContextのtransient特性を保持してシリアル化を防ぐ
    @transient private val rddSparkContext: SparkContext,
    val metrics: MetricNode,
    private val rddPartitions: Array[Partition],
    private val rddDependencies: Seq[Dependency[_]],
    private val rddShuffleReadFull: Boolean,
    // 対応するネイティブプラン。ここでのpysicalPlanNodeは実際にはpbで定義されたクラスで、datafusionの実行計画演算子であり、jniデータインタラクションの核心です
    val nativePlan: (Partition, TaskContext) => PhysicalPlanNode,
    val friendlyName: String = null)
    extends RDD[InternalRow](rddSparkContext, rddDependencies)
    with Logging {

  if (friendlyName != null) {
    setName(friendlyName)
  }

  def isShuffleReadFull: Boolean = Shims.get.getRDDShuffleReadFull(this)
  Shims.get.setRDDShuffleReadFull(this, rddShuffleReadFull)

  override protected def getPartitions: Array[Partition] = rddPartitions
  override protected def getDependencies: Seq[Dependency[_]] = rddDependencies

  // 実行エントリーポイント、JNIを呼び出してネイティブ関数を実行します
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val computingNativePlan = nativePlan(split, context)
    NativeHelper.executeNativePlan(computingNativePlan, metrics, split, Some(context))
  }
}

executeNativePlan の段階に来たので、blaze は具体的に JVM と Rust ランタイムの通信をどのように実現しているのでしょうか?

jni 通信メカニズム#

Java 側の実装#

まず、NativeHelper.executeNativePlan 関数を見てみましょう:

  def executeNativePlan(
      nativePlan: PhysicalPlanNode,
      metrics: MetricNode,
      partition: Partition,
      context: Option[TaskContext]): Iterator[InternalRow] = {

    if (partition.index == 0 && metrics != null && context.nonEmpty) {
      metrics.foreach(_.add("stage_id", context.get.stageId()))
    }
    if (nativePlan == null) {
      return Iterator.empty
    }
    BlazeCallNativeWrapper(nativePlan, partition, context, metrics).getRowIterator
  }

BlazeCallNativeWrapper は jni インタラクションの詳細をラップしており、重要なステップは以下の通りです:

  1. 初回初期化時にネイティブランタイムを起動し、libblaze.so をロードしてネイティブエンジンの実行ライブラリを取得します。この部分は blaze のコアのネイティブエンジン部分です。その後、native 関数 JniBridge.callNative () を呼び出してランタイムを起動し、後続のデータアクセスのためのポインタを保持します。
class BlazeCallNativeWrapper(...) {
  // ローカルライブラリをロードすることを確認
  BlazeCallNativeWrapper.initNative() 
  // ネイティブランタイムを初期化
  private var nativeRuntimePtr = JniBridge.callNative(...) 
  // ...
}
  1. BlazeCallNativeWrapper はデータイテレータを保持し、hasNext を呼び出すとネイティブランタイムから準備が整ったデータを取得します。トリガー関数は JniBridge.nextBatch () です。このとき、ネイティブランタイムは scala 関数 importBatch () を呼び出して arrow 形式のデータを scala の ArrayBuffer クラスに変換し、batchRows に保存します。

arrow を使用したデータ交換の実装を学ぶことができます:

// Arrowデータメタ情報のインポート(ネイティブランタイムのコールバック)
protected def importSchema(ffiSchemaPtr: Long): Unit = {
  Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { schemaAllocator =>
    Using.resource(ArrowSchema.wrap(ffiSchemaPtr)) { ffiSchema =>
      arrowSchema = Data.importSchema(schemaAllocator, ffiSchema, dictionaryProvider)
      schema = ArrowUtils.fromArrowSchema(arrowSchema)
      toUnsafe = UnsafeProjection.create(schema)
    }
  }
}
// Arrowデータのインポート(ネイティブランタイムのコールバック)
 protected def importBatch(ffiArrayPtr: Long): Unit = {
    if (nativeRuntimePtr == 0) {
      throw new RuntimeException("ネイティブランタイムが終了しました")
    }

    Using.resource(ArrowUtils.newChildAllocator(getClass.getName)) { batchAllocator =>
      Using.resources(
        ArrowArray.wrap(ffiArrayPtr),
        VectorSchemaRoot.create(arrowSchema, batchAllocator)) { case (ffiArray, root) =>
        Data.importIntoVectorSchemaRoot(batchAllocator, ffiArray, root, dictionaryProvider)
        val batch = ColumnarHelper.rootAsBatch(root)

        batchRows.append(
          ColumnarHelper
            // 列行変換
            .batchAsRowIter(batch)
            // toUnsafe再利用で重複計算を回避
            .map(row => toUnsafe(row).copy().asInstanceOf[InternalRow])
            .toSeq: _*)
      }
    }
  }

Rust 側の実装#

次に、ネイティブランタイムの詳細を確認します。主に callNative と nextBatch の実装に注目します。

callNative メソッドの Rust での呼び出しは execu.rs の Java_org_apache_spark_sql_blaze_JniBridge_callNative にあります:
核心的に行っていることは以下の通りです:

  1. jni java クラスを初期化
  2. datafusion のセッションコンテキストを初期化
  3. 実行ランタイム NativeExecutionRuntime を初期化し、実行プロセスを監視し、実行計画を処理します

完全な実装を見てみましょう。公式には非常に理解しやすいコメントがあります:

#[allow(non_snake_case)]
#[no_mangle]
pub extern "system" fn Java_org_apache_spark_sql_blaze_JniBridge_callNative(
    env: JNIEnv,
    _: JClass,
    executor_memory_overhead: i64,
    native_wrapper: JObject,
) -> i64 {
    handle_unwinded_scope(|| -> Result<i64> {
        static SESSION: OnceCell<SessionContext> = OnceCell::new();
        static INIT: OnceCell<()> = OnceCell::new();
        // シングルトンを一度だけ初期化
        INIT.get_or_try_init(|| {
            // この時点ではロギングは初期化されていません
            eprintln!("------ blazeネイティブ環境を初期化中 ------");
            init_logging();

            // jni javaクラスを初期化
            log::info!("jniブリッジを初期化中");
            JavaClasses::init(&env);

            // datafusionセッションコンテキストを初期化
            log::info!("datafusionセッションを初期化中");
            SESSION.get_or_try_init(|| {
                let max_memory = executor_memory_overhead as usize;
                let memory_fraction = conf::MEMORY_FRACTION.value()?;
                let batch_size = conf::BATCH_SIZE.value()? as usize;
                MemManager::init((max_memory as f64 * memory_fraction) as usize);

                let session_config = SessionConfig::new().with_batch_size(batch_size);
                let runtime_config =
                    RuntimeConfig::new().with_disk_manager(DiskManagerConfig::Disabled);
                let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
                let session = SessionContext::new_with_config_rt(session_config, runtime);
                Ok::<_, DataFusionError>(session)
            })?;
            Ok::<_, DataFusionError>(())
        })?;
        let native_wrapper = jni_new_global_ref!(native_wrapper)?;

        // 実行ランタイムを作成
        let runtime = Box::new(NativeExecutionRuntime::start(
            native_wrapper,
            SESSION.get().unwrap().task_ctx(),
        )?);

        // ランタイムの生ポインタを返します
        Ok::<_, DataFusionError>(Box::into_raw(runtime) as usize as i64)
    })
}

rt.rs では、NativeExecutionRuntime を初期化する際に protobuf で定義された実行計画をすべて逆シリアル化して datafusion の ExecutionPlan 構造に変換し、その後 datafusion が全体の実行計画を実行します。datafusion は tokio を使用して非同期実行操作を開始し、各演算子の compute を順次トリガーします。
コードは長いですが、いくつか省略しました:

pub fn start(native_wrapper: GlobalRef, context: Arc<TaskContext>) -> Result<Self> {
    // 1. JNIからタスク定義を取得し、デコードします
    let raw_task_definition = jni_call!(...);
    let task_definition = TaskDefinition::decode(...);

    // 2. DataFusion実行計画を作成
    let execution_plan: Arc<dyn ExecutionPlan> = plan.try_into()?;

    // 3. Tokioのマルチスレッドランタイムを初期化
    let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
        .on_thread_start(|| { /* スレッドローカルコンテキストを設定 */ })
        .build()?;

    // 4. データ生成チャネルを作成
    let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);

    // 5. 非同期データフローを実行
    let consume_stream = async move {
        let mut stream = exec_ctx.execute(&execution_plan)?;
        jni_call!(...importSchema(...)); // スキーマをJavaにエクスポート
        
        // 6. データバッチをストリーミング生成
        while let Some(batch) = stream.next().await {
            batch_sender.send(Ok(Some(batch)))?;
        }
    };

    // 7. 非同期タスクを起動
    let join_handle = tokio_runtime.spawn(async move {
        consume_stream.await.unwrap_or_else(|err| { 
            /* エラーハンドリング */
        });
    });

    // 8. ランタイムインスタンスを返します
    Ok(Self { ... })
}

次に、nextBatch の実装を見てみましょう。nextBatch は実際に呼び出される NativeExecutionRuntime の next_batch 関数で、主に arrow を利用してゼロコピーのデータ転送を実現しています:

pub fn next_batch(&self) -> bool {
    // 1. 内部クロージャを定義して実際のデータ受信を処理
        let next_batch = || -> Result<bool> {
            match self
                .batch_receiver
                .recv()
                .or_else(|err| df_execution_err!("バッチ受信エラー: {err}"))??
            {
                Some(batch) => {
                    // 2. Arrowデータ構造に変換
                    let struct_array = StructArray::from(batch);
                    let ffi_array = FFI_ArrowArray::new(&struct_array.to_data());
                    // 3. JNIを通じてJava層メソッドを呼び出してデータをインポート
                    jni_call!(BlazeCallNativeWrapper(self.native_wrapper.as_obj())
                        .importBatch(&ffi_array as *const FFI_ArrowArray as i64) -> ()
                    )?;
                    Ok(true)
                }
                None => Ok(false),
            }
        };
         // 4. 実行し、最終結果を処理
        match next_batch() {
            Ok(ret) => return ret,
            Err(err) => {
                // 5. エラーをJava層にコールバック
                let _ = set_error(
                    &self.native_wrapper,
                    &format!("レコードバッチポーリングエラー: {err}"),
                    None,
                );
                return false;
            }
        }
    }

結論#

以上が blaze の起動プロセスの大まかな流れです。次は各演算子の実装方法を深く理解することです。時間があれば、後でソースコードの読み取りに関する記事をいくつか更新する予定です。ここで一旦区切ります(
ここに AI によるフローチャートの要約も提供します:

┌───────────────────────┐
Spark Driver
│  (Java/Scala側)       │
└──────────┬────────────┘
1. Protobuf形式の実行計画を生成

┌───────────────────────┐
│ JNIインターフェース層   │
- FFIを通じて言語間呼び出し │
- シリアル化されたバイナリデータを転送 │
└──────────┬────────────┘
2. PhysicalPlanNodeに逆シリアル化

┌───────────────────────┐
TryInto::try_into()   │
- コアの変換エントリ     │  
└──────────┬────────────┘ 
3. ノードを再帰的に処理

┌───────────────────────┐
│ ExecutionPlan構築      │
- 30以上のノードタイプをmatchで処理 │
- 例:               │
ProjectionExec
FilterExec
HashJoinExec
└──────────┬────────────┘
4. 物理演算子ツリーを構築

┌───────────────────────┐
│ 実行エンジン           │
- execute()メソッドを呼び出す │
- 分散タスクスケジューリング  │
└───────────────────────┘
読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。