banner
bladedragon

bladedragon

sparkファイルの読み取りパーティションパラメータの設定方法

Spark のパーティションパラメータの設定方法について、簡単に記録します。

最近、あるケースを発見しました。オンラインの EMR の Spark は、同じ SQL と入力条件の場合、別のクラスターの Spark on YARN よりもはるかに高速です。他の追加要因を除外した後、EMR のタスクはファイルの読み取りを開始する段階で、別のタスクよりもはるかに少ないことがわかりました。読み取り段階では通常、多くのタスクは必要ありません。逆に、過剰なタスクの分割は、大量の時間をネットワーク IO に浪費することになります。したがって、パーティションパラメータを適切に設定する必要があります。
パーティション設定に影響を与えるパラメータは多くありますが、私たちがよく設定するパーティションパラメータには、 spark.sql.files.maxPartitionBytesspark.default.parallelism などがありますが、実際にはそれだけではありません。ソースコードを確認することで、各種パラメータの存在意義と設定方法が明確になります。
ソースコードの場所は org.apache.spark.sql.execution.datasources.FilePartition#maxSplitBytes です。

// 注意该方法只适用ocr等文件读取的情况
def maxSplitBytes(
      sparkSession: SparkSession,
      selectedPartitions: Seq[PartitionDirectory]): Long = {
    val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
    val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
      .getOrElse(sparkSession.leafNodeDefaultParallelism)
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / minPartitionNum

    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
  }

ここでは、4 つのパラメータが関係しています。

filesMaxPartitionBytes#

対応するパラメータ spark.sql.files.maxPartitionBytes

ファイルを読み込む際に、1 つのパーティションに詰め込む最大バイト数です。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。

このパラメータは、1 つのパーティションに格納する最大バイト数を指定します。

filesOpenCostInBytes#

対応するパラメータ spark.sql.files.openCostInBytes

ファイルを開くための推定コストで、同じ時間にスキャンできるバイト数で測定されます。これは、複数のファイルを 1 つのパーティションに配置する場合に使用されます。過大評価する方が良いです。そのため、小さいファイルを持つパーティションの方が、大きいファイルを持つパーティションよりも速くなります(最初にスケジュールされる)。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。

このパラメータは、ファイルを開く際の推定バイト数を指定します。適切なサイズを設定することで、ファイルのオープン速度を向上させることができます。

filesMinPartitionNum#

対応するパラメータ spark.sql.files.minPartitionNum

推奨される(保証されていない)最小の分割ファイルパーティションの数です。設定されていない場合、デフォルト値は spark.default.parallelism です。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。

このパラメータは、ファイルの最小並列度を指定し、各パーティションが読み込むことができる最大バイト数を取得するために使用されます。

leafNodeDefaultParallelism#

対応するパラメータ spark.sql.leafNodeDefaultParallelism

データを生成する Spark SQL のリーフノード(ファイルスキャンノード、ローカルデータスキャンノード、範囲ノードなど)のデフォルトの並列度です。この設定のデフォルト値は 'SparkContext#defaultParallelism' です。

ほとんどの場合、このパラメータは実際には spark.default.parallelism と同じです。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。