Github user edwinalu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r195886625
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ if (shouldLogExecutorMetricsUpdates) {
+ // clear out any previous attempts, that did not have a stage
completed event
+ val prevAttemptId = event.stageInfo.attemptNumber() - 1
+ for (attemptId <- 0 to prevAttemptId) {
+ liveStageExecutorMetrics.remove((event.stageInfo.stageId,
attemptId))
+ }
+
+ // log the peak executor metrics for the stage, for each live
executor,
+ // whether or not the executor is running tasks for the stage
+ val accumUpdates = new ArrayBuffer[(Long, Int, Int,
Seq[AccumulableInfo])]()
+ val executorMap = liveStageExecutorMetrics.remove(
+ (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+ executorMap.foreach {
+ executorEntry => {
+ for ((executorId, peakExecutorMetrics) <- executorEntry) {
+ val executorMetrics = new ExecutorMetrics(-1,
peakExecutorMetrics.metrics)
--- End diff --
This is confusing, especially since the current code does not have the
stage level metrics, just executor level.
The -1 wouldn't be replaced. PeakExecutorMetrics only tracks the peak
metric values for each executor (and later each executor per stage) and doesn't
have a timestamp.
If there is a local max (500), which is the max between T3 and T5, it would
be logged at time T5, even if it happens at T3.5.
In actual event order, what the driver sees when the application is running:
T1: start of stage 1
T2: value of 1000 for metric m1
T3: start of stage 2
T3.5: peak value of 500 for metric m1
T4: stage 1 ends
T5: stage 2 ends
Suppose that 1000 (seen at T2) is the peak value of m1 between T1 and T4,
so it is the peak value seen while stage 1 is running. The m1=1000 value will
be dumped as the max value in an executorMetricsUpdate event right before the
stage 1 end event is logged. Also suppose that 500 (seen at T3.5 is the peak
value of m1 between T3 and T5, so it is the peak value seen while stage 2 is
running. The m1=500 value will be dumped as the max value in an
executorMetricsUpdate right before the stage 2 end event is logged.
The generated Spark history log would have the following order of events:
Start of stage 1
Start of stage 2
executorMetricsUpdate with m1=1000
end of stage 1
executorMetricsUpdate with m1=500
end of stage 2
When the Spark history server is reading the log, it will will create the
peakExecutorMetrics for stage 2 when stage 2 starts, which is before it sees
the executorMetricsUpdate with m1=1000, and so will store m1=1000 as the
current peak value. When it later sees the executorMetricsUpdate with m1=500,
it needs to overwrite the m1 value (and set to 500), not compare and update to
the max value (which would be 1000). The -1 would indicate that the event is
coming from the Spark history log, is a peak value for the stage just about to
complete, and should overwrite any previous values.
If the timestamp is set to a positive value, then it will do a compare and
update to the max value instead.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]