最近因为一些工作场景需要获取 spark 任务的更多信息,所以要修改 spark 源码添加新的 metric。顺便串一下整个 metric 体系,形成整体认知。
以下讨论基于 spark3.1.2,让我们开始吧
ListenerBus 机制#
在动手实操之前,首先我们需要了解 spark 的 listener 机制
spark 中存在多个模块的协同操作,通过考虑到实时性和结构的解耦,spark 引入了事件总线的机制,通过注册和监听事件的方式,可以 spark 各种事件的触发点执行不同的操作。
在 spark 中可以指定某个操作的执行作为事件,并事先注册在事件总线中。当该操作执行后发送事件,事件总线就会接受到该事件并异步地将该事件传递给所有监听该事件的监听器,并执行对应方法。这实际上也是典型的生产 - 消费者模式。我们需要人为的生产一条事件,然后便会有消费者去消费这个事件。最经典的用法就是在某个任务执行前后监听该任务,并近实时获取该任务的开始结束状态,从而获取该任务执行过程中的各种统计信息,也就是我们下面会讲到的 metric。
在 spark 中如果想要自行添加事件监听也非常简单,不过对代码有一些侵入性。这里以 task 开始执行的为例。
首先我们需要注册事件到 listenerBus,先创建一个 case class 继承 SparkListenerEvent
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
SparkListenerEvent 的具体实现如下:
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
SparkListenerEvent 会被 SparkListener 接受,并回调指定的方法,具体看 SparkListener.scala 中的实现:
@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
// Some code here ...
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
从上面的代码中我们可以得知两点,一是 spark 内置了一些 event 的回调方法,但是如果我们自定义 event,最终会走到 onOtherEvent 方法中,所以实际上只需要继承 onOtherEvent 方法,并在其中进行模式匹配,就可以监听我们自定义的事件,从而减少了对 spark core 代码的侵入。 二是我们可以继承 SparkListener 来重载方法,从而加入我们自己的监听逻辑.
至于 SparkListener 是如何实现监听的,我们就需要来看 ListenerBus 的具体实现,在 ListenerBus.scala 中:
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
//Some code here ...
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
lazy val listenerName = Utils.getFormattedClassName(listener)
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${listenerName} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
}
}
}
}
上面的代码可以简单解释一下,在 listenerBus 中实际用一个 list 保存所有的监听器 Listener,并用一个定时器去定时调度,执行监听方法。这里只是基础实现,实际上我们常用的 SparkListenerBus 具体由 AsyncEventQueue 实现,在实际发送事件时采用异步的方式,不会引起阻塞。
了解了 ListenerBus 的具体实现,我们就可以更自如的使用它。在我们具体的实现类中,只需要传入 listener Bus 的实例,如下面 dagScheduler.scala 的例子:
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
// some code here ...
}
然后在需要监听的行为后面发送事件:
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
这样,在执行到任务开始的逻辑后,spark 就会自动监听并发送事件,我们的 listener 接受到事件后,就会执行 onTaskStart 的方法。
Metric 体系#
spark 引用了一个第三方的metric 系统 其基本逻辑也就是从注册一些指标,用 kv 存储的方式统计这些指标,并实现对应的输出
主要概念有三个
-
source metric 的来源,例如 JvmSource
-
sink metric 的输出, spark 支持多种输出方式,例如 CSVSink
-
MetricRegistry,具体存储 metric 的类,由第三方库实现
我们今天暂时不讲 metric 如果具体实现,而是关注 SparkUI 如何展示这些 metric。以 ExecutorMetric 为例。
进入 ExecutorMetric.scala, 我们可以看到实际上 executorMetric 实际是一层封装,具体的 metric 信息在 ExecutorMetricType.scala 中:
class ExecutorMetrics private[spark] extends Serializable {
// Metrics are indexed by ExecutorMetricType.metricToOffset
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
/** Returns the value for the specified metric. */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.metricToOffset(metricName))
}
// Some code here ..
}
private[spark] object ExecutorMetricType {
// List of all executor metric getters
val metricGetters = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory,
ProcessTreeMetrics,
GarbageCollectionMetrics
)
val (metricToOffset, numMetrics) = {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
(0 until m.names.length).foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
}
(definedMetricsAndOffset, numberOfMetrics)
}
}
这里我们就不看这些 metric 的实际统计方法了,只关注如何保存这些 metric。实际上在 ExecutorMetricType 的类中很多都是调用 JMX 实现的统计。
这些 metric 是如何展示的呢?我们首先来看看 api.scala,这里保存了 executorMetric:
class ExecutorStageSummary private[spark](
val taskTime : Long,
val failedTasks : Int,
val succeededTasks : Int,
val killedTasks : Int,
val inputBytes : Long,
val inputRecords : Long,
val outputBytes : Long,
val outputRecords : Long,
val shuffleRead : Long,
val shuffleReadRecords : Long,
val shuffleWrite : Long,
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
@deprecated("use isExcludedForStage instead", "3.1.0")
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val isExcludedForStage: Boolean)
这里随便挑了一个类,实际上还有别的类里保存了 executorMetric, 我们可以发现这里的 metric 信息排列其实和 sparkUI 上的展示如出一辙,可以联想到,在前端展示的数据格式就是这种。
再向上追溯,我们发现它被封装成 LiveEntity:
private class LiveExecutorStageSummary(
stageId: Int,
attemptId: Int,
executorId: String) extends LiveEntity {
import LiveEntityHelpers._
var taskTime = 0L
var succeededTasks = 0
var failedTasks = 0
var killedTasks = 0
var isExcluded = false
var metrics = createMetrics(default = 0L)
val peakExecutorMetrics = new ExecutorMetrics()
override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
failedTasks,
succeededTasks,
killedTasks,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isExcluded,
Some(peakExecutorMetrics).filter(_.isSet),
isExcluded)
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}
}
在这里暴露它的更新方法 doUpdate ()。 再向上追溯我们可以看到再 LiveEntity.scala 中提供了对象的保存方法:
// LiveEntity.scala
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// Always check triggers on the first write, since adding an element to the store may
// cause the maximum count for the element type to be exceeded.
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
这里我们可以得到,metric 的实际信息会最终保存到一个 kv 存储中供前端调用。而这个实体类是如何更新的呢?跳转到该方法的引用位置,我们发现实际上该方法是被 appStatusListener 的监听方法调用。基于上一部分的探索,由此我们可以得出结论,metric 的更新就是通过 SparkListener 的监听机制更新。
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
appStatusSource: Option[AppStatusSource] = None,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
// Some code here...
}
在 sparkUI 上添加自定义的 executor metric#
有了上面的知识储备,我们总算能在 spark UI 中自由加入自己的 metric。由于前端数据的交互都是 js 实现,我们只需要定位到对应的 js 文件就能找到数据的展示位置。
我们以 executorMetric 为例,首先找到 executor 对应的 page,在 executorspage.js 中:
我们找到如下 ajax 函数:
$.get(createTemplateURI(appId, "executorspage"), function (template) { ...});
实际的数据展示就是以下面的方式展示:
{
data: function (row, type) {
var peakMemoryMetrics = row.peakMemoryMetrics;
if (typeof peakMemoryMetrics !== 'undefined') {
if (type !== 'display')
return peakMemoryMetrics.DirectPoolMemory;
else
return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
} else {
if (type !== 'display') {
return 0;
} else {
return '0.0 B / 0.0 B';
}
}
}
}
在前端展示则是在以下 html 元素:
<!-- in executorspage-template.html-->
<th>
<span data-toggle="tooltip" data-placement="top"
title="Peak direct byte buffer / memory-mapped buffer pool memory used by JVM. This refers to BufferPoolMXBean with form 'java.nio:type=BufferPool,name=direct' and 'java.nio:type=BufferPool,name=mapped'.">
Peak Pool Memory Direct / Mapped
</span>
</th>
可以看到这里直接 从后端数据中获取 peakMemoryMetrics 这个对象,那么数据返回格式什么呢?我们查询前端请求函数:
// executorspage.js
var endPoint = createRESTEndPointForExecutorsPage(appId);
// utils.js
function createRESTEndPointForExecutorsPage(appId) {
var words = getBaseURI().split('/');
var ind = words.indexOf("proxy");
var newBaseURI;
if (ind > 0) {
appId = words[ind + 1];
newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {
appId = words[ind + 1];
var attemptId = words[ind + 2];
newBaseURI = words.slice(0, ind).join('/');
if (isNaN(attemptId)) {
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
} else {
return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allexecutors";
}
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}
发现请求的是 “/allexecutors”,于是我们查找后端对应处理逻辑:
// OneApplicationResource.scala
@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
发现返回的格式就是我们之前查看到的 ExecutorSummary 类。至此,我们已经完成掌握了 metric 的前端展示逻辑。
那么我们如何添加自定义的metric信息呢?假设我们需要在executor页面上新增一个metric。
首先第一步:在 api.scala 中的 executorSummary 类添加我们自己的 metric 对象
第二步:在 appStausListener.scala 中 在我们想要更新 metric 的事件的监听逻辑加入我们自己的 metric 更新逻辑
第三步:在 executorspage.js 中的 data 展示数组中添加一个对象,加入我们自己的数据展示逻辑
第四步:不要忘记在 executorspage-template.html 中 heml 元素来展示我们的数据。注意数据展示的顺序是数据的顺序,一定要和 js 中数组的数据保存顺序一致。
由此,我们就实现了在 SparkUI 中添加我们的自定义 metric 信息了。你学废了吗?