简单记录一个 spark 的分区参数如何配置的问题。
最近发现一个案例,线上 EMR 的 spark 在相同 sql 和输入情况比另一个集群的 spark on yarn 要快很多,在排除其他额外因素后,发现 EMR 的任务在开始读取文件的阶段任务数远小于另一个。在读取阶段一般不需要过多 task,划分过多的 task 反倒会浪费大量时间在网络 IO 上。因此,需要合理配置分区参数.
影响分区设置的参数有很多,我们常见会配置的分区参数诸如 spark.sql.files.maxPartitionBytes
和 spark.default.parallelism
,但事实上还不止。通过查看源码我们才可以清晰知道各种参数的存在意义和设置方法:
源码位置在org.apache.spark.sql.execution.datasources.FilePartition#maxSplitBytes
// 注意该方法只适用ocr等文件读取的情况
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
这里涉及到四个参数
filesMaxPartitionBytes
#
对应参数 spark.sql.files.maxPartitionBytes
The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
该参数指定单分区装入的最大字节数
filesOpenCostInBytes
#
对应参数spark.sql.files.openCostInBytes
The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It's better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
该参数指定打开文件时预估的字节数,设置合适的大小可以提高打开文件的速度
filesMinPartitionNum
#
对应参数spark.sql.files.minPartitionNum
The suggested (not guaranteed) minimum number of split file partitions. If not set, the default value is
spark.default.parallelism
. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
该参数指定文件最小的并行度,用来获取每个分区可以读取的最大字节数
leafNodeDefaultParallelism
#
对应参数 spark.sql.leafNodeDefaultParallelism
The default parallelism of Spark SQL leaf nodes that produce data, such as the file scan node, the local data scan node, the range node, etc. The default value of this config is 'SparkContext#defaultParallelism'.
对于大部分情况,这个参数其实就等于 spark.default.parallelism