Recently, due to some work scenarios requiring more information about Spark tasks, we need to modify the Spark source code to add new metrics. Let's also connect the entire metric system to form a comprehensive understanding.
The following discussion is based on Spark 3.1.2, so let's get started.
ListenerBus Mechanism#
Before diving into practical operations, we first need to understand Spark's listener mechanism.
Spark involves the collaborative operation of multiple modules. By considering real-time performance and structural decoupling, Spark introduces an event bus mechanism. By registering and listening to events, various trigger points of Spark events can execute different operations.
In Spark, you can specify the execution of a certain operation as an event and register it in the event bus in advance. When that operation is executed, an event is sent, and the event bus receives this event and asynchronously passes it to all listeners that are listening for that event, executing the corresponding methods. This is essentially a typical producer-consumer model. We need to artificially produce an event, and then consumers will consume this event. The most classic use case is to listen to a task before and after its execution, thereby obtaining the start and end status of that task in near real-time, which allows us to gather various statistical information during the execution of that task, referred to as metrics below.
If you want to add event listeners in Spark, it's also very simple, although it is somewhat intrusive to the code. Here, we take the start of a task as an example.
First, we need to register the event to the listenerBus by creating a case class that inherits from SparkListenerEvent:
case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
extends SparkListenerEvent
The specific implementation of SparkListenerEvent is as follows:
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
SparkListenerEvent will be accepted by SparkListener and call the specified method, as seen in the implementation in SparkListener.scala:
@DeveloperApi
abstract class SparkListener extends SparkListenerInterface {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { }
// Some code here ...
override def onOtherEvent(event: SparkListenerEvent): Unit = { }
}
From the above code, we can infer two points: first, Spark has built-in callback methods for some events, but if we define a custom event, it will ultimately go to the onOtherEvent method. Therefore, we only need to inherit the onOtherEvent method and perform pattern matching within it to listen to our custom events, thus reducing the intrusion into Spark core code. Second, we can inherit SparkListener to override methods and add our own listening logic.
As for how SparkListener implements listening, we need to look at the specific implementation of ListenerBus in ListenerBus.scala:
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
//Some code here ...
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
lazy val listenerName = Utils.getFormattedClassName(listener)
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie)
removeListenerOnError(listener)
case NonFatal(e) if !isIgnorableException(e) =>
logError(s"Listener ${listenerName} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
}
}
}
}
The above code can be simply explained: in the listenerBus, a list is used to store all listeners, and a timer is used to schedule and execute the listening methods. This is just a basic implementation; in practice, the commonly used SparkListenerBus is specifically implemented by AsyncEventQueue, which sends events asynchronously to avoid blocking.
Having understood the specific implementation of ListenerBus, we can use it more freely. In our specific implementation class, we only need to pass in an instance of the listener Bus, as shown in the example of dagScheduler.scala:
private[spark] class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging {
// some code here ...
}
Then, send an event after the behavior that needs to be listened to:
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo): Unit = {
// Note that there is a chance that this task is launched after the stage is cancelled.
// In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId =
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
}
Thus, when executing the logic for the start of the task, Spark will automatically listen and send events. Our listener will receive the event and execute the onTaskStart method.
Metric System#
Spark has referenced a third-party metric system, whose basic logic is to register some metrics, store these metrics in a key-value format, and implement corresponding outputs.
There are three main concepts:
- Source: the source of the metric, e.g., JvmSource
- Sink: the output of the metric; Spark supports various output methods, such as CSVSink
- MetricRegistry: the class that specifically stores metrics, implemented by a third-party library
Today, we will not discuss the specific implementation of metrics but focus on how SparkUI displays these metrics. Taking ExecutorMetric as an example, we can see that ExecutorMetric is actually a layer of encapsulation, and the specific metric information is in ExecutorMetricType.scala:
class ExecutorMetrics private[spark] extends Serializable {
// Metrics are indexed by ExecutorMetricType.metricToOffset
private val metrics = new Array[Long](ExecutorMetricType.numMetrics)
/** Returns the value for the specified metric. */
def getMetricValue(metricName: String): Long = {
metrics(ExecutorMetricType.metricToOffset(metricName))
}
// Some code here ..
}
private[spark] object ExecutorMetricType {
// List of all executor metric getters
val metricGetters = IndexedSeq(
JVMHeapMemory,
JVMOffHeapMemory,
OnHeapExecutionMemory,
OffHeapExecutionMemory,
OnHeapStorageMemory,
OffHeapStorageMemory,
OnHeapUnifiedMemory,
OffHeapUnifiedMemory,
DirectPoolMemory,
MappedPoolMemory,
ProcessTreeMetrics,
GarbageCollectionMetrics
)
val (metricToOffset, numMetrics) = {
var numberOfMetrics = 0
val definedMetricsAndOffset = mutable.LinkedHashMap.empty[String, Int]
metricGetters.foreach { m =>
(0 until m.names.length).foreach { idx =>
definedMetricsAndOffset += (m.names(idx) -> (idx + numberOfMetrics))
}
numberOfMetrics += m.names.length
}
(definedMetricsAndOffset, numberOfMetrics)
}
}
Here, we will not look at the actual statistical methods of these metrics but focus on how to store these metrics. In fact, many of the metrics in the ExecutorMetricType class call JMX for statistics.
How are these metrics displayed? First, let's look at api.scala, which stores executor metrics:
class ExecutorStageSummary private[spark](
val taskTime : Long,
val failedTasks : Int,
val succeededTasks : Int,
val killedTasks : Int,
val inputBytes : Long,
val inputRecords : Long,
val outputBytes : Long,
val outputRecords : Long,
val shuffleRead : Long,
val shuffleReadRecords : Long,
val shuffleWrite : Long,
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
@deprecated("use isExcludedForStage instead", "3.1.0")
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val isExcludedForStage: Boolean)
Here, we randomly picked a class; in fact, there are other classes that store executor metrics. We can find that the arrangement of metric information here is actually consistent with the display on SparkUI, leading us to think that the data format displayed on the front end is this.
Tracing back further, we find that it is encapsulated into LiveEntity:
private class LiveExecutorStageSummary(
stageId: Int,
attemptId: Int,
executorId: String) extends LiveEntity {
import LiveEntityHelpers._
var taskTime = 0L
var succeededTasks = 0
var failedTasks = 0
var killedTasks = 0
var isExcluded = false
var metrics = createMetrics(default = 0L)
val peakExecutorMetrics = new ExecutorMetrics()
override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
failedTasks,
succeededTasks,
killedTasks,
metrics.inputMetrics.bytesRead,
metrics.inputMetrics.recordsRead,
metrics.outputMetrics.bytesWritten,
metrics.outputMetrics.recordsWritten,
metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
metrics.shuffleReadMetrics.recordsRead,
metrics.shuffleWriteMetrics.bytesWritten,
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isExcluded,
Some(peakExecutorMetrics).filter(_.isSet),
isExcluded)
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}
}
Here, it exposes its update method doUpdate(). Tracing back further, we can see that the saving method for the object is provided in LiveEntity.scala:
// LiveEntity.scala
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// Always check triggers on the first write, since adding an element to the store may
// cause the maximum count for the element type to be exceeded.
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
Here, we can see that the actual metric information will ultimately be saved to a key-value store for front-end calls. How is this entity class updated? Jumping to the reference location of this method, we find that this method is actually called by the listening method of appStatusListener. Based on the exploration of the previous section, we can conclude that the update of metrics is done through the SparkListener listening mechanism.
private[spark] class AppStatusListener(
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
appStatusSource: Option[AppStatusSource] = None,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
// Some code here...
}
Adding Custom Executor Metrics in SparkUI#
With the knowledge above, we can finally freely add our own metrics in Spark UI. Since the front-end data interaction is all implemented in JavaScript, we just need to locate the corresponding JavaScript file to find the data display location.
Taking executor metrics as an example, we first find the page corresponding to the executor in executorspage.js:
We find the following AJAX function:
$.get(createTemplateURI(appId, "executorspage"), function (template) { ...});
The actual data display is shown in the following way:
{
data: function (row, type) {
var peakMemoryMetrics = row.peakMemoryMetrics;
if (typeof peakMemoryMetrics !== 'undefined') {
if (type !== 'display')
return peakMemoryMetrics.DirectPoolMemory;
else
return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
} else {
if (type !== 'display') {
return 0;
} else {
return '0.0 B / 0.0 B';
}
}
}
}
The front-end display is in the following HTML element:
<!-- in executorspage-template.html-->
<th>
<span data-toggle="tooltip" data-placement="top"
title="Peak direct byte buffer / memory-mapped buffer pool memory used by JVM. This refers to BufferPoolMXBean with form 'java.nio:type=BufferPool,name=direct' and 'java.nio:type=BufferPool,name=mapped'.">
Peak Pool Memory Direct / Mapped
</span>
</th>
You can see that it directly retrieves the peakMemoryMetrics object from the back-end data. So what is the format of the returned data? We check the front-end request function:
// executorspage.js
var endPoint = createRESTEndPointForExecutorsPage(appId);
// utils.js
function createRESTEndPointForExecutorsPage(appId) {
var words = getBaseURI().split('/');
var ind = words.indexOf("proxy");
var newBaseURI;
if (ind > 0) {
appId = words[ind + 1];
newBaseURI = words.slice(0, ind + 2).join('/');
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
}
ind = words.indexOf("history");
if (ind > 0) {
appId = words[ind + 1];
var attemptId = words[ind + 2];
newBaseURI = words.slice(0, ind).join('/');
if (isNaN(attemptId)) {
return newBaseURI + "/api/v1/applications/" + appId + "/allexecutors";
} else {
return newBaseURI + "/api/v1/applications/" + appId + "/" + attemptId + "/allexecutors";
}
}
return uiRoot + "/api/v1/applications/" + appId + "/allexecutors";
}
We find that it requests “/allexecutors,” so we look for the corresponding backend handling logic:
// OneApplicationResource.scala
@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
We find that the return format is the ExecutorSummary class we previously viewed. Thus, we have mastered the front-end display logic of metrics.
So how do we add custom metric information? Suppose we need to add a new metric on the executor page.
The first step: Add our own metric object in the executorSummary class in api.scala.
The second step: In appStatusListener.scala, add our own metric update logic in the event listening logic where we want to update the metric.
The third step: In executorspage.js, add an object in the data display array to include our own data display logic.
The fourth step: Don't forget to add HTML elements in executorspage-template.html to display our data. Note that the order of data display is the order of the data; it must be consistent with the order of the data stored in the JavaScript array.
Thus, we have achieved adding our custom metric information in SparkUI. Have you learned it?