Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r190363619 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { + val conf = getLoggingConf(testDirPath, None) + val logName = "executorMetricsUpdated-test" + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val listenerBus = new LiveListenerBus(conf) + + // expected ExecutorMetricsUpdate, for the given stage id and executor id + val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( + ((0, "1"), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), + ((0, "2"), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), + ((1, "1"), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), + ((1, "2"), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))) + + // Events to post. + val events = Array( + SparkListenerApplicationStart("executionMetrics", None, + 1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + + // play the events for the event logger + eventLogger.start() + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) + listenerBus.addToEventLogQueue(eventLogger) + events.foreach(event => listenerBus.post(event)) + listenerBus.stop() + eventLogger.stop() + + // Verify the log file contains the expected events. + // Posted events should be logged, except for ExecutorMetricsUpdate events -- these + // are consolidated, and the peak values for each stage are logged at stage end. + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 14) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + events.foreach {event => + event match { + case metricsUpdate: SparkListenerExecutorMetricsUpdate => + case stageCompleted: SparkListenerStageCompleted => + for (j <- 1 to 2) { + checkExecutorMetricsUpdate(lines(i), stageCompleted.stageInfo.stageId, + expectedMetricsEvents) + i += 1 + } + checkEvent(lines(i), event) + i += 1 --- End diff -- I found this pretty confusing at first. I suggest renaming `i` to `logIdx` and including a comment about the `j` loop. Also we tend to use `(1 to 2).foreach`. eg. ```scala // just before the SparkListenerStageCompleted gets logged, we expect to get a // SparkListenerExecutorMetricsUpdate for each executor (1 to 2).foreach { _ => checkExecutorMetricsUpdate(lines(logIdx), stageCompleted.stageInfo.stageId, expectedMetricsEvents) logIdx += 1 } // also check that we get the expected SparkListenerStageCompleted checkEvent(lines(logIdx), event) logIdx += 1 ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org