Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20940#discussion_r179797803
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
}
}
+ /**
+ * Test executor metrics update logging functionality. This checks that a
+ * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+ * log if one of the executor metrics is larger than any previously
+ * recorded value for the metric, per executor per stage. The task
metrics
+ * should not be added.
+ */
+ private def testExecutorMetricsUpdateEventLogging() {
+ val conf = getLoggingConf(testDirPath, None)
+ val logName = "executorMetricsUpdated-test"
+ val eventLogger = new EventLoggingListener(logName, None,
testDirPath.toUri(), conf)
+ val listenerBus = new LiveListenerBus(conf)
+
+ // list of events and if they should be logged
+ val events = Array(
+ (SparkListenerApplicationStart("executionMetrics", None,
+ 1L, "update", None), true),
+ (createExecutorAddedEvent(1), true),
+ (createExecutorAddedEvent(2), true),
+ (createStageSubmittedEvent(0), true),
+ (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L),
true), // onheap storage
+ (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L),
true), // JVM used
+ (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L),
true), // onheap unified
+ (createStageSubmittedEvent(1), true),
+ (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L),
true), // new stage
+ (createStageCompletedEvent(0), true),
+ (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L),
true), // onheap execution
+ (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L),
true), // offheap execution
+ (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L),
true), // offheap storage
+ (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L),
false),
+ (createStageCompletedEvent(1), true),
+ (SparkListenerApplicationEnd(1000L), true))
+
+ // play the events for the event logger
+ eventLogger.start()
+ listenerBus.start(Mockito.mock(classOf[SparkContext]),
Mockito.mock(classOf[MetricsSystem]))
+ listenerBus.addToEventLogQueue(eventLogger)
+ for ((event, included) <- events) {
+ listenerBus.post(event)
+ }
+ listenerBus.stop()
+ eventLogger.stop()
+
+ // Verify the log file contains the expected events
+ val logData = EventLoggingListener.openEventLog(new
Path(eventLogger.logPath), fileSystem)
+ try {
+ val lines = readLines(logData)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ assert(lines.size === 19)
+ assert(lines(0).contains("SparkListenerLogStart"))
+ assert(lines(1).contains("SparkListenerApplicationStart"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ var i = 1
+ for ((event, included) <- events) {
+ if (included) {
+ checkEvent(lines(i), event)
+ i += 1
+ }
+ }
+ } finally {
+ logData.close()
+ }
+ }
+
+ /** Create a stage submitted event for the specified stage Id. */
+ private def createStageSubmittedEvent(stageId: Int) =
--- End diff --
nit: multiline methods enclosed in `{}`, here and elsewhere, even if body
is only one line
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]