banner
bladedragon

bladedragon

Spark文件讀取分區參數設置方法

簡單記錄一個 spark 的分區參數如何配置的問題。

最近發現一個案例,線上 EMR 的 spark 在相同 sql 和輸入情況比另一個集群的 spark on yarn 要快很多,在排除其他額外因素後,發現 EMR 的任務在開始讀取文件的階段任務數遠小於另一個。在讀取階段一般不需要過多 task,劃分過多的 task 反倒會浪費大量時間在網絡 IO 上。因此,需要合理配置分區參數.
影響分區設置的參數有很多,我們常見會配置的分區參數諸如 spark.sql.files.maxPartitionBytesspark.default.parallelism,但事實上還不止。通過查看源碼我們才可以清晰知道各種參數的存在意義和設置方法:
源碼位置在org.apache.spark.sql.execution.datasources.FilePartition#maxSplitBytes

// 注意該方法只適用ocr等文件讀取的情況
def maxSplitBytes(
      sparkSession: SparkSession,
      selectedPartitions: Seq[PartitionDirectory]): Long = {
    val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
    val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
    val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
      .getOrElse(sparkSession.leafNodeDefaultParallelism)
    val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
    val bytesPerCore = totalBytes / minPartitionNum

    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
  }

這裡涉及到四個參數

filesMaxPartitionBytes#

對應參數 spark.sql.files.maxPartitionBytes

The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

該參數指定單分區裝入的最大位元組數

filesOpenCostInBytes#

對應參數spark.sql.files.openCostInBytes

The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It's better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

該參數指定打開文件時預估的位元組數,設置合適的大小可以提高打開文件的速度

filesMinPartitionNum#

對應參數spark.sql.files.minPartitionNum

The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is spark.default.parallelism. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

該參數指定文件最小的並行度,用來獲取每個分區可以讀取的最大位元組數

leafNodeDefaultParallelism#

對應參數 spark.sql.leafNodeDefaultParallelism

The default parallelism of Spark SQL leaf nodes that produce data, such as the file scan node, the local data scan node, the range node, etc. The default value of this config is 'SparkContext#defaultParallelism'.

對於大部分情況,這個參數其實就等於 spark.default.parallelism

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。