Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r195770446
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ if (shouldLogExecutorMetricsUpdates) {
+ // clear out any previous attempts, that did not have a stage
completed event
+ val prevAttemptId = event.stageInfo.attemptNumber() - 1
+ for (attemptId <- 0 to prevAttemptId) {
+ liveStageExecutorMetrics.remove((event.stageInfo.stageId,
attemptId))
+ }
+
+ // log the peak executor metrics for the stage, for each live
executor,
+ // whether or not the executor is running tasks for the stage
+ val accumUpdates = new ArrayBuffer[(Long, Int, Int,
Seq[AccumulableInfo])]()
+ val executorMap = liveStageExecutorMetrics.remove(
+ (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+ executorMap.foreach {
+ executorEntry => {
+ for ((executorId, peakExecutorMetrics) <- executorEntry) {
+ val executorMetrics = new ExecutorMetrics(-1,
peakExecutorMetrics.metrics)
--- End diff --
hmm, now I'm confused, sorry I am probably rehashing something we have
discussed before. What do you plan on replacing that -1 with later on?
My understanding was that you'd always keep one timestamp for the last time
you detected a new max, for any metric. You'd update the metrics per-stage,
iff you were still within that stage. so you'd lose the ability to tell in
more detail what the max was for one stage in isolation, vs. stages running
together (eg. if there was a max at T3.5, you'd just log that one, and so you'd
lose info about the T2 max which was for stage 1 by itself).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]