最近因為一些工作場景需要獲取 spark 任務的更多信息,所以要修改 spark 源碼添加新的 metric。順便串一下整個 metric 體系,形成整體認知。
以下討論基於 spark3.1.2,讓我們開始吧
ListenerBus 機制#
在動手實操之前,首先我們需要了解 spark 的 listener 機制
spark 中存在多個模塊的協同操作,通過考慮到實時性和結構的解耦,spark 引入了事件總線的機制,通過註冊和監聽事件的方式,可以 spark 各種事件的觸發點執行不同的操作。
在 spark 中可以指定某個操作的執行作為事件,並事先註冊在事件總線中。當該操作執行後發送事件,事件總線就會接受到該事件並異步地將該事件傳遞給所有監聽該事件的監聽器,並執行對應方法。這實際上也是典型的生產 - 消費者模式。我們需要人為的生產一條事件,然後便會有消費者去消費這個事件。最經典的用法就是在某個任務執行前後監聽該任務,並近實時獲取該任務的開始結束狀態,從而獲取該任務執行過程中的各種統計信息,也就是我們下面會講到的 metric。
在 spark 中如果想要自行添加事件監聽也非常簡單,不過對代碼有一些侵入性。這裡以 task 開始執行的為例。
首先我們需要註冊事件到 listenerBus,先創建一個 case class 繼承 SparkListenerEvent
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
SparkListenerEvent 的具體實現如下:
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
SparkListenerEvent 會被 SparkListener 接受,並回調指定的方法,具體看 SparkListener.scala 中的實現:
@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
// Some code here ...
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
從上面的代碼中我們可以得知兩點,一是 spark 內置了一些 event 的回調方法,但是如果我們自定義 event,最終會走到 onOtherEvent 方法中,所以實際上只需要繼承 onOtherEvent 方法,並在其中進行模式匹配,就可以監聽我們自定義的事件,從而減少了對 spark core 代碼的侵入。二是我們可以繼承 SparkListener 來重載方法,從而加入我們自己的監聽邏輯。
至於 SparkListener 是如何實現監聽的,我們就需要來看 ListenerBus 的具體實現,在 ListenerBus.scala 中:
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
//Some code here ...
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
lazy val listenerName = Utils.getFormattedClassName(listener)
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${listenerName} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
}
}
}
}
上面的代碼可以簡單解釋一下,在 listenerBus 中實際用一個 list 保存所有的監聽器 Listener,並用一個定時器去定時調度,執行監聽方法。這裡只是基礎實現,實際上我們常用的 SparkListenerBus 具體由 AsyncEventQueue 實現,在實際發送事件時採用異步的方式,不會引起阻塞。
了解了 ListenerBus 的具體實現,我們就可以更自如的使用它。在我們具體的實現類中,只需要傳入 listener Bus 的實例,如下面 dagScheduler.scala 的例子:
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
// some code here ...
}
然後在需要監聽的行為後面發送事件:
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
這樣,在執行到任務開始的邏輯後,spark 就會自動監聽並發送事件,我們的 listener 接受到事件後,就會執行 onTaskStart 的方法。
Metric 體系#
spark 引用了一个第三方的metric 系統 其基本邏輯也就是從註冊一些指標,用 kv 存儲的方式統計這些指標,並實現對應的輸出
主要概念有三個
-
source metric 的來源,例如 JvmSource
-
sink metric 的輸出, spark 支持多種輸出方式,例如 CSVSink
-
MetricRegistry,具體存儲 metric 的類,由第三方庫實現
我們今天暫時不講 metric 如果具體實現,而是關注 SparkUI 如何展示這些 metric。以 ExecutorMetric 為例。
進入 ExecutorMetric.scala, 我們可以看到實際上 executorMetric 實際是一層封裝,具體的 metric 信息在 ExecutorMetricType.scala 中:
class ExecutorMetrics private[spark] extends Serializable {
// Metrics are indexed by ExecutorMetricType.metricToOffset
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
/** Returns the value for the specified metric. */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.metricToOffset(metricName))
}
// Some code here ..
}
private[spark] object ExecutorMetricType {
// List of all executor metric getters
val metricGetters = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory,
ProcessTreeMetrics,
GarbageCollectionMetrics
)
val (metricToOffset, numMetrics) = {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
(0 until m.names.length).foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
}
(definedMetricsAndOffset, numberOfMetrics)
}
}
這裡我們就不看這些 metric 的實際統計方法了,只關注如何保存這些 metric。實際上在 ExecutorMetricType 的類中很多都是調用 JMX 實現的統計。
這些 metric 是如何展示的呢?我們首先來看看 api.scala,這裡保存了 executorMetric:
class ExecutorStageSummary private[spark](
val taskTime : Long,
val failedTasks : Int,
val succeededTasks : Int,
val killedTasks : Int,
val inputBytes : Long,
val inputRecords : Long,
val outputBytes : Long,
val outputRecords : Long,
val shuffleRead : Long,
val shuffleReadRecords : Long,
val shuffleWrite : Long,
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
@deprecated("use isExcludedForStage instead", "3.1.0")
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val isExcludedForStage: Boolean)
這裡隨便挑了一个類,實際上還有別的類裡保存了 executorMetric, 我們可以發現這裡的 metric 信息排列其實和 sparkUI 上的展示如出一轍,可以聯想到,在前端展示的數據格式就是這種。
再向上追溯,我們發現它被封裝成 LiveEntity:
private class LiveExecutorStageSummary(
stageId: Int,
attemptId: Int,
executorId: String) extends LiveEntity {
import LiveEntityHelpers._
var taskTime = 0L
var succeededTasks = 0
var failedTasks = 0
var killedTasks = 0
var isExcluded = false
var metrics = createMetrics(default = 0L)
val peakExecutorMetrics = new ExecutorMetrics()
override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
failedTasks,
succeededTasks,
killedTasks,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isExcluded,
Some(peakExecutorMetrics).filter(_.isSet),
isExcluded)
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}
}
在這裡暴露它的更新方法 doUpdate ()。再向上追溯我們可以看到再 LiveEntity.scala 中提供了對象的保存方法:
// LiveEntity.scala
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// Always check triggers on the first write, since adding an element to the store may
// cause the maximum count for the element type to be exceeded.
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
這裡我們可以得到,metric 的實際信息會最終保存到一個 kv 存儲中供前端調用。而這個實體類是如何更新的呢?跳轉到該方法的引用位置,我們發現實際上該方法是被 appStatusListener 的監聽方法調用。基於上一部分的探索,由此我們可以得出結論,metric 的更新就是通過 SparkListener 的監聽機制更新。
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
appStatusSource: Option[AppStatusSource] = None,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
// Some code here...
}
在 sparkUI 上添加自定義的 executor metric#
有了上面的知識儲備,我們總算能在 spark UI 中自由加入自己的 metric。由於前端數據的交互都是 js 實現,我們只需要定位到對應的 js 文件就能找到數據的展示位置。
我們以 executorMetric 為例,首先找到 executor 對應的 page,在 executorspage.js 中:
我們找到如下 ajax 函數:
$.get(createTemplateURI(appId, "executorspage"), function (template) { ...});
實際的數據展示就是以下面的方式展示:
{
data: function (row, type) {
var peakMemoryMetrics = row.peakMemoryMetrics;
if (typeof peakMemoryMetrics !== 'undefined') {
if (type !== 'display')
return peakMemoryMetrics.DirectPoolMemory;
else
return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
} else {
if (type !== 'display') {
return 0;
} else {
return '0.0 B / 0.0 B';
}
}
}
}
在前端展示則是在以下 html 元素:
<!-- in executorspage-template.html-->
<th>
<span data-toggle="tooltip" data-placement="top"
title="Peak direct byte buffer / memory-mapped buffer memory used by JVM. This refers to BufferPoolMXBean with form 'java.nio:type=BufferPool,name=direct' and 'java.nio:type=BufferPool,name=mapped'.">
Peak Pool Memory Direct / Mapped
</span>
</th>
可以看到這裡直接 從後端數據中獲取 peakMemoryMetrics 這個對象,那麼數據返回格式什麼呢?我們查詢前端請求函數:
// executorspage.js
var endPoint = createRESTEndPointForExecutorsPage(appId);
// utils.js
function createRESTEndPointForExecutorsPage(appId) {
var words = getBaseURI().split('/');
var ind = words.indexOf("proxy");
var newBaseURI;
if (ind > 0) {
appId = words[ind + 1];
newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {
appId = words[ind + 1];
var attemptId = words[ind + 2];
newBaseURI = words.slice(0, ind).join('/');
if (isNaN(attemptId)) {
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
} else {
return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allexecutors";
}
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}
發現請求的是 “/allexecutors”,於是我們查找後端對應處理邏輯:
// OneApplicationResource.scala
@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
發現返回的格式就是我們之前查看到的 ExecutorSummary 類。至此,我們已經完成掌握了 metric 的前端展示邏輯。
那麼我們如何添加自定義的metric信息呢?假設我們需要在executor頁面上新增一個metric。
首先第一步:在 api.scala 中的 executorSummary 類添加我們自己的 metric 對象
第二步:在 appStausListener.scala 中 在我們想要更新 metric 的事件的監聽邏輯加入我們自己的 metric 更新邏輯
第三步:在 executorspage.js 中的 data 展示數組中添加一個對象,加入我們自己的數據展示邏輯
第四步:不要忘記在 executorspage-template.html 中 heml 元素來展示我們的數據。注意數據展示的順序是數據的順序,一定要和 js 中數組的數據保存順序一致。
由此,我們就實現了在 SparkUI 中添加我們的自定義 metric 信息了。你學廢了嗎?