Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r195955081
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
@@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
// Events that trigger a flush
override def onStageCompleted(event: SparkListenerStageCompleted): Unit
= {
+ if (shouldLogExecutorMetricsUpdates) {
+ // clear out any previous attempts, that did not have a stage
completed event
+ val prevAttemptId = event.stageInfo.attemptNumber() - 1
+ for (attemptId <- 0 to prevAttemptId) {
+ liveStageExecutorMetrics.remove((event.stageInfo.stageId,
attemptId))
+ }
+
+ // log the peak executor metrics for the stage, for each live
executor,
+ // whether or not the executor is running tasks for the stage
+ val accumUpdates = new ArrayBuffer[(Long, Int, Int,
Seq[AccumulableInfo])]()
+ val executorMap = liveStageExecutorMetrics.remove(
+ (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+ executorMap.foreach {
+ executorEntry => {
+ for ((executorId, peakExecutorMetrics) <- executorEntry) {
+ val executorMetrics = new ExecutorMetrics(-1,
peakExecutorMetrics.metrics)
--- End diff --
I can see how this would work, but it also seems far more confusing than
necessary. My understanding was that you'd always log the last timestamp which
replaced the peak value for *any* metric. Are you ever logging something other
than -1 for the timestamp? If not, we just shouldn't put any timestamp in the
log.
It might be helpful to step back a bit , and rather than focusing on the
mechanics of what you're doing now, discuss the desired end behavior in the
history server and the live UI based on the timestamp.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]