Github user edwinalu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r195538848
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ if (shouldLogExecutorMetricsUpdates) {
+ // clear out any previous attempts, that did not have a stage
completed event
+ val prevAttemptId = event.stageInfo.attemptNumber() - 1
+ for (attemptId <- 0 to prevAttemptId) {
+ liveStageExecutorMetrics.remove((event.stageInfo.stageId,
attemptId))
+ }
+
+ // log the peak executor metrics for the stage, for each live
executor,
+ // whether or not the executor is running tasks for the stage
+ val accumUpdates = new ArrayBuffer[(Long, Int, Int,
Seq[AccumulableInfo])]()
+ val executorMap = liveStageExecutorMetrics.remove(
+ (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+ executorMap.foreach {
+ executorEntry => {
+ for ((executorId, peakExecutorMetrics) <- executorEntry) {
+ val executorMetrics = new ExecutorMetrics(-1,
peakExecutorMetrics.metrics)
--- End diff --
We need to pass in a value for timestamp, but there isn't really one for
the peak metrics, since times for each peak could be different.
When processing, -1 will help indicate that the event is coming from the
history log, and contains the peak values for the stage that is just ending.
When updating the stage executor peaks (peak executor values stored for each
active stage), we can replace all of the peak executor metric values instead of
updating with the max of current and new values for each metric.
As an example, suppose there is the following scenario:
T1: start of stage 1
T2: peak value of 1000 for metric m1
T3: start of stage 2
T4: stage 1 ends, and peak metric values for stage 1 are logged, including
m1=1000
T5: stage 2 ends, and peak metric values for stage 2 are logged.
If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of
stage 2), and say that the highest value for m1 during that period is 500, then
we want the peak value for m1 for stage 2 to show as 500.
There would be an ExecutorMetricsUpdate event logged (and then read) at T4
(end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when
reading the history log, we set the stage 2 peakExecutorMetrics to the max of
current or new values from ExecutorMetricsUpdate, then the value for stage 2
would remain at 1000. However, we want it to be replaced by the value of 500
when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During
processing of ExecutorMetricsUpdate, for the stage level metrics, it will
replace all the peakExecutorMetrics if timestamp is -1.
I can add some comments for this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]