最近、いくつかの作業シーンのために Spark タスクの詳細情報を取得する必要があったため、Spark のソースコードを修正して新しいメトリックを追加することにしました。ついでに、全体のメトリックシステムをつなげて、全体的な認識を形成します。
以下の議論は Spark 3.1.2 に基づいています。それでは始めましょう。
ListenerBus メカニズム#
実際に作業を始める前に、まず Spark のリスナーメカニズムを理解する必要があります。Spark には複数のモジュールが協調して動作しており、リアルタイム性と構造のデカップリングを考慮することで、Spark はイベントバスのメカニズムを導入しました。イベントを登録し、リスニングすることで、Spark のさまざまなイベントのトリガーポイントで異なる操作を実行できます。
Spark では、特定の操作の実行をイベントとして指定し、事前にイベントバスに登録できます。その操作が実行された後にイベントを送信すると、イベントバスはそのイベントを受信し、非同期的にそのイベントをすべてのリスナーに渡し、対応するメソッドを実行します。これは実際には典型的なプロデューサー - コンシューマーモデルでもあります。私たちは人工的にイベントを生成し、その後に消費者がそのイベントを消費します。最も一般的な使用法は、特定のタスクの実行前後にそのタスクをリスニングし、ほぼリアルタイムでそのタスクの開始と終了の状態を取得することで、タスク実行中のさまざまな統計情報、つまり以下で説明するメトリックを取得することです。
Spark でイベントリスナーを自分で追加するのも非常に簡単ですが、コードに少し侵入性があります。ここでは、タスクの開始実行を例にします。まず、リスナーバスにイベントを登録する必要があります。まず、SparkListenerEvent を継承する case class を作成します。
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
SparkListenerEvent の具体的な実装は以下の通りです:
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* このイベントをイベントログに出力するかどうか */
protected[spark] def logEvent: Boolean = true
}
SparkListenerEvent は SparkListener によって受け入れられ、指定されたメソッドがコールバックされます。具体的には SparkListener.scala の実装を見てください:
@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
// ここにいくつかのコード ...
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
上記のコードから、2 つの点がわかります。1 つは、Spark にはいくつかのイベントのコールバックメソッドが組み込まれていますが、カスタムイベントを定義すると最終的には onOtherEvent メソッドに到達するため、実際には onOtherEvent メソッドを継承し、その中でパターンマッチングを行うことで、カスタムイベントをリスニングでき、Spark コアコードへの侵入を減らすことができます。2 つ目は、SparkListener を継承してメソッドをオーバーロードし、自分のリスニングロジックを追加できることです。
SparkListener がどのようにリスニングを実現しているかを理解するためには、ListenerBus の具体的な実装を見てみる必要があります。ListenerBus.scala の中で:
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// テストでのアクセスのために `private[spark]` とマークされています。
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
// ここにいくつかのコード ...
/**
* 登録されたすべてのリスナーにイベントを投稿します。`postToAll`の呼び出し元は、すべてのイベントに対して
* `postToAll`を同じスレッドで呼び出すことを保証する必要があります。
*/
def postToAll(event: E): Unit = {
// JavaConvertersを使用すると、asScalaを使用してJIterableWrapperを作成できます。
// ただし、このメソッドは頻繁に呼び出されます。ラッパーコストを避けるために、ここでは
// Java Iteratorを直接使用します。
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
lazy val listenerName = Utils.getFormattedClassName(listener)
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// 中断された場合、すぐにInterruptedExceptionをスローしたいので、このリスナーに関連付けることができます。
// queue.take()などを待つのではなく、これを検出します。
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"${listenerName}への投稿中に中断されました。そのリスナーを削除します。", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"リスナー${listenerName}が例外をスローしました", e)
} finally {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"リスナー${listenerName}によるイベント${redactEvent(event)}の処理に${elapsed / 1000000000d}sかかりました。")
}
}
}
}
}
}
上記のコードを簡単に説明すると、listenerBus では実際にすべてのリスナー Listener をリストで保持し、タイマーを使って定期的にスケジュールし、リスニングメソッドを実行します。これは基本的な実装に過ぎず、実際に使用される SparkListenerBus は AsyncEventQueue によって具体的に実装されており、実際のイベント送信時には非同期の方法を採用し、ブロッキングを引き起こしません。
ListenerBus の具体的な実装を理解したので、より自由に使用できるようになります。具体的な実装クラスでは、listener Bus のインスタンスを渡すだけで、以下の dagScheduler.scala の例のようになります:
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
// ここにいくつかのコード ...
}
次に、リスニングが必要な動作の後にイベントを送信します:
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
// このタスクがステージがキャンセルされた後に起動される可能性があることに注意してください。
// その場合、stageIdToStageにはもはやステージがありません。
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
このように、タスク開始のロジックに到達すると、Spark は自動的にリスニングしてイベントを送信します。私たちのリスナーはイベントを受け取ると、onTaskStart メソッドを実行します。
メトリックシステム#
Spark は、第三者のメトリックシステムを利用しており、その基本的なロジックは、いくつかの指標を登録し、KV ストレージの方法でこれらの指標を集計し、対応する出力を実現することです。主な概念は 3 つあります。
- source メトリックのソース、例えば JvmSource
- sink メトリックの出力、Spark はさまざまな出力方法をサポートしています、例えば CSVSink
- MetricRegistry、具体的にメトリックを保存するクラスで、第三者ライブラリによって実装されています。
今日はメトリックの具体的な実装については触れず、SparkUI がこれらのメトリックをどのように表示するかに焦点を当てます。ExecutorMetric を例にとります。ExecutorMetric.scala に入ると、実際には executorMetric は一層のラッピングであり、具体的なメトリック情報は ExecutorMetricType.scala にあります:
class ExecutorMetrics private[spark] extends Serializable {
// メトリックはExecutorMetricType.metricToOffsetによってインデックス付けされています
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
/** 指定されたメトリックの値を返します。 */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.metricToOffset(metricName))
}
// ここにいくつかのコード ..
}
private[spark] object ExecutorMetricType {
// すべてのexecutorメトリックゲッターのリスト
val metricGetters = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory,
ProcessTreeMetrics,
GarbageCollectionMetrics
)
val (metricToOffset, numMetrics) = {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
(0 until m.names.length).foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
}
(definedMetricsAndOffset, numberOfMetrics)
}
}
ここでは、これらのメトリックの実際の集計方法については見ませんが、メトリックをどのように保存するかに注目します。実際には ExecutorMetricType のクラス内で多くのものが JMX を使用して集計されています。
これらのメトリックはどのように表示されるのでしょうか?まず api.scala を見てみると、ここに executorMetric が保存されています:
class ExecutorStageSummary private[spark](
val taskTime : Long,
val failedTasks : Int,
val succeededTasks : Int,
val killedTasks : Int,
val inputBytes : Long,
val inputRecords : Long,
val outputBytes : Long,
val outputRecords : Long,
val shuffleRead : Long,
val shuffleReadRecords : Long,
val shuffleWrite : Long,
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
@deprecated("use isExcludedForStage instead", "3.1.0")
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val isExcludedForStage: Boolean)
ここではランダムに 1 つのクラスを選びましたが、実際には他のクラスにも executorMetric が保存されています。ここでのメトリック情報の配置は、実際に SparkUI 上での表示と一致していることがわかります。前面で表示されるデータ形式はこのようなものだと想像できます。
さらに上に遡ると、これが LiveEntity としてラッピングされていることがわかります:
private class LiveExecutorStageSummary(
stageId: Int,
attemptId: Int,
executorId: String) extends LiveEntity {
import LiveEntityHelpers._
var taskTime = 0L
var succeededTasks = 0
var failedTasks = 0
var killedTasks = 0
var isExcluded = false
var metrics = createMetrics(default = 0L)
val peakExecutorMetrics = new ExecutorMetrics()
override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
failedTasks,
succeededTasks,
killedTasks,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isExcluded,
Some(peakExecutorMetrics).filter(_.isSet),
isExcluded)
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}
}
ここで、更新メソッド doUpdate () が公開されています。さらに上に遡ると、LiveEntity.scala でオブジェクトの保存方法が提供されています:
// LiveEntity.scala
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// 最初の書き込み時には常にトリガーを確認します。ストアに要素を追加すると、要素タイプの最大数を超える可能性があります。
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
ここで、メトリックの実際の情報は最終的に KV ストレージに保存され、フロントエンドから呼び出されることがわかります。このエンティティクラスはどのように更新されるのでしょうか?そのメソッドの参照位置にジャンプすると、実際にはそのメソッドが appStatusListener のリスニングメソッドによって呼び出されていることがわかります。前の部分の探索に基づいて、メトリックの更新は SparkListener のリスニングメカニズムによって行われると結論できます。
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
appStatusSource: Option[AppStatusSource] = None,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
// ここにいくつかのコード...
}
SparkUI にカスタムの executor メトリックを追加する#
上記の知識をもとに、Spark UI に自由に自分のメトリックを追加できるようになりました。フロントエンドのデータのやり取りはすべて JS で実現されているため、対応する JS ファイルを特定すれば、データの表示位置を見つけることができます。executorMetric を例にとると、まず executor に対応するページを見つけ、executorspage.js の中で:
以下のような ajax 関数を見つけます:
$.get(createTemplateURI(appId, "executorspage"), function (template) { ...});
実際のデータ表示は以下の方法で行われます:
{
data: function (row, type) {
var peakMemoryMetrics = row.peakMemoryMetrics;
if (typeof peakMemoryMetrics !== 'undefined') {
if (type !== 'display')
return peakMemoryMetrics.DirectPoolMemory;
else
return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
} else {
if (type !== 'display') {
return 0;
} else {
return '0.0 B / 0.0 B';
}
}
}
}
フロントエンドの表示は以下の HTML 要素で行われます:
<!-- in executorspage-template.html-->
<th>
<span data-toggle="tooltip" data-placement="top"
title="JVMによって使用されるピーク直接バイトバッファ/メモリマップドバッファプールメモリ。これは、'java.nio:type=BufferPool,name=direct'および'java.nio:type=BufferPool,name=mapped'の形式のBufferPoolMXBeanを指します。">
ピークプールメモリ直接/マップ
</span>
</th>
ここでは、バックエンドデータから peakMemoryMetrics オブジェクトを直接取得しています。では、データの返却形式はどうなっているのでしょうか?フロントエンドのリクエスト関数を調べると:
// executorspage.js
var endPoint = createRESTEndPointForExecutorsPage(appId);
関数 createRESTEndPointForExecutorsPage (appId) を見てみると、以下のようになります:
function createRESTEndPointForExecutorsPage(appId) {
var words = getBaseURI().split('/');
var ind = words.indexOf("proxy");
var newBaseURI;
if (ind > 0) {
appId = words[ind + 1];
newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {
appId = words[ind + 1];
var attemptId = words[ind + 2];
newBaseURI = words.slice(0, ind).join('/');
if (isNaN(attemptId)) {
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
} else {
return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allexecutors";
}
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}
リクエストは「/allexecutors」を取得していることがわかります。したがって、バックエンドの対応する処理ロジックを探します:
// OneApplicationResource.scala
@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
返される形式は、以前に確認した ExecutorSummary クラスであることがわかります。これで、メトリックのフロントエンド表示ロジックを完全に理解しました。
では、カスタムのメトリック情報をどのように追加するのでしょうか?executorページに新しいメトリックを追加する必要があると仮定します。
まず第一歩:api.scala の executorSummary クラスに自分のメトリックオブジェクトを追加します。
第二歩:appStatusListener.scala で、メトリックを更新したいイベントのリスニングロジックに自分のメトリック更新ロジックを追加します。
第三歩:executorspage.js のデータ表示配列にオブジェクトを追加し、自分のデータ表示ロジックを追加します。
第四歩:executorspage-template.html で、データを表示する HTML 要素を忘れずに追加します。データ表示の順序はデータの順序であり、必ず JS の配列のデータ保存順序と一致させる必要があります。
これにより、SparkUI に自分のカスタムメトリック情報を追加することができました。あなたはこれを学びましたか?