Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/20940#discussion_r179978192 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { + val conf = getLoggingConf(testDirPath, None) + val logName = "executorMetricsUpdated-test" + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val listenerBus = new LiveListenerBus(conf) + + // list of events and if they should be logged + val events = Array( + (SparkListenerApplicationStart("executionMetrics", None, + 1L, "update", None), true), + (createExecutorAddedEvent(1), true), + (createExecutorAddedEvent(2), true), + (createStageSubmittedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), true), // onheap storage + (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), true), // JVM used + (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), true), // onheap unified + (createStageSubmittedEvent(1), true), + (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), true), // new stage + (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), true), // new stage + (createStageCompletedEvent(0), true), + (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), true), // onheap execution + (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), true), // offheap execution + (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), true), // offheap storage + (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), false), + (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), false), + (createStageCompletedEvent(1), true), + (SparkListenerApplicationEnd(1000L), true)) + + // play the events for the event logger + eventLogger.start() + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) + listenerBus.addToEventLogQueue(eventLogger) + for ((event, included) <- events) { + listenerBus.post(event) + } + listenerBus.stop() + eventLogger.stop() + + // Verify the log file contains the expected events + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 19) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + for ((event, included) <- events) { + if (included) { + checkEvent(lines(i), event) + i += 1 + } + } + } finally { + logData.close() + } + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = --- End diff -- Done.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org