Spark のパーティションパラメータの設定方法について、簡単に記録します。
最近、あるケースを発見しました。オンラインの EMR の Spark は、同じ SQL と入力条件の場合、別のクラスターの Spark on YARN よりもはるかに高速です。他の追加要因を除外した後、EMR のタスクはファイルの読み取りを開始する段階で、別のタスクよりもはるかに少ないことがわかりました。読み取り段階では通常、多くのタスクは必要ありません。逆に、過剰なタスクの分割は、大量の時間をネットワーク IO に浪費することになります。したがって、パーティションパラメータを適切に設定する必要があります。
パーティション設定に影響を与えるパラメータは多くありますが、私たちがよく設定するパーティションパラメータには、 spark.sql.files.maxPartitionBytes
と spark.default.parallelism
などがありますが、実際にはそれだけではありません。ソースコードを確認することで、各種パラメータの存在意義と設定方法が明確になります。
ソースコードの場所は org.apache.spark.sql.execution.datasources.FilePartition#maxSplitBytes
です。
// 注意该方法只适用ocr等文件读取的情况
def maxSplitBytes(
sparkSession: SparkSession,
selectedPartitions: Seq[PartitionDirectory]): Long = {
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
}
ここでは、4 つのパラメータが関係しています。
filesMaxPartitionBytes
#
対応するパラメータ spark.sql.files.maxPartitionBytes
ファイルを読み込む際に、1 つのパーティションに詰め込む最大バイト数です。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。
このパラメータは、1 つのパーティションに格納する最大バイト数を指定します。
filesOpenCostInBytes
#
対応するパラメータ spark.sql.files.openCostInBytes
ファイルを開くための推定コストで、同じ時間にスキャンできるバイト数で測定されます。これは、複数のファイルを 1 つのパーティションに配置する場合に使用されます。過大評価する方が良いです。そのため、小さいファイルを持つパーティションの方が、大きいファイルを持つパーティションよりも速くなります(最初にスケジュールされる)。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。
このパラメータは、ファイルを開く際の推定バイト数を指定します。適切なサイズを設定することで、ファイルのオープン速度を向上させることができます。
filesMinPartitionNum
#
対応するパラメータ spark.sql.files.minPartitionNum
推奨される(保証されていない)最小の分割ファイルパーティションの数です。設定されていない場合、デフォルト値は
spark.default.parallelism
です。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。
このパラメータは、ファイルの最小並列度を指定し、各パーティションが読み込むことができる最大バイト数を取得するために使用されます。
leafNodeDefaultParallelism
#
対応するパラメータ spark.sql.leafNodeDefaultParallelism
データを生成する Spark SQL のリーフノード(ファイルスキャンノード、ローカルデータスキャンノード、範囲ノードなど)のデフォルトの並列度です。この設定のデフォルト値は 'SparkContext#defaultParallelism' です。
ほとんどの場合、このパラメータは実際には spark.default.parallelism
と同じです。