wypoon commented on a change in pull request #25673: [SPARK-28770][CORE][TEST] 
Fix ReplayListenerSuite tests that sometime…
URL: https://github.com/apache/spark/pull/25673#discussion_r320849006
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
 ##########
 @@ -219,11 +219,25 @@ class ReplayListenerSuite extends SparkFunSuite with 
BeforeAndAfter with LocalSp
     // Verify the same events are replayed in the same order
     assert(sc.eventLogger.isDefined)
     val originalEvents = sc.eventLogger.get.loggedEvents
+      .map(JsonProtocol.sparkEventFromJson(_))
     val replayedEvents = eventMonster.loggedEvents
-    originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
-      // Don't compare the JSON here because accumulators in StageInfo may be 
out of order
-      JsonProtocolSuite.assertEquals(
-        JsonProtocol.sparkEventFromJson(e1), 
JsonProtocol.sparkEventFromJson(e2))
+      .map(JsonProtocol.sparkEventFromJson(_))
+    // Executor metrics updates are not logged, so do not get replayed.
+    // Stage executor metrics are logged at stage completion, for any of the 
executors and
+    // the driver for which we have metrics. We always have metrics for 
executors, because
+    // they are sent at task end as well as in executor metrics updates. We do 
not always
+    // have metrics for the driver, because they are only sent in executor 
metrics updates
+    // (at heartbeat intervals), and when we do, it is only in the original 
events, never
+    // in the replay, since executor metrics updates are not replayed.
+    // For this reason, exclude stage executor metrics for the driver.
+    val filteredEvents = originalEvents.filter { e =>
+      if (e.isInstanceOf[SparkListenerStageExecutorMetrics] &&
+        e.asInstanceOf[SparkListenerStageExecutorMetrics].execId == "driver") {
 
 Review comment:
   I tried to explain briefly in the comment. Let me try to explain in more 
detail.
   
   When the application is run, executor metrics update events may occur (if 
the application runs long enough). These events are not written to the event 
log, so they do not get replayed. That is the root cause of differences.
   On executor metrics update, the EventLoggingListener updates a map it uses 
to track per-stage metrics, but does not log. The listener also receives 
metrics on task end. Because of this, it always has metrics for executors. On 
stage completion. the listener logs stage executor metrics, for any of the 
executors/driver it has metrics for. As explained, sometimes it will have 
metrics for the driver (if an executor metrics update arrived from the driver), 
but most of the time not.
   To recap, SparkListenerStageExecutorMetrics events are not events received 
while the application is running, but secondary events derived from metrics 
received and written to the event log. (One other point not relevant to this 
bug but may help understanding - on replay, the 
SparkListenerStageExecutorMetrics events do get replayed but are ignored by the 
listener. The replay listener logs SparkListenerStageExecutorMetrics events on 
stage completion using the same internal logic as the original listener.)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to