Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20940#discussion_r179801207
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
---
@@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
}
}
+ /**
+ * Test executor metrics update logging functionality. This checks that a
+ * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+ * log if one of the executor metrics is larger than any previously
+ * recorded value for the metric, per executor per stage. The task
metrics
+ * should not be added.
+ */
+ private def testExecutorMetricsUpdateEventLogging() {
+ val conf = getLoggingConf(testDirPath, None)
+ val logName = "executorMetricsUpdated-test"
+ val eventLogger = new EventLoggingListener(logName, None,
testDirPath.toUri(), conf)
+ val listenerBus = new LiveListenerBus(conf)
+
+ // list of events and if they should be logged
+ val events = Array(
+ (SparkListenerApplicationStart("executionMetrics", None,
+ 1L, "update", None), true),
+ (createExecutorAddedEvent(1), true),
+ (createExecutorAddedEvent(2), true),
+ (createStageSubmittedEvent(0), true),
+ (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L),
true), // onheap storage
+ (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L),
true), // JVM used
+ (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L),
true), // onheap unified
+ (createStageSubmittedEvent(1), true),
+ (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L),
true), // new stage
+ (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L),
true), // new stage
+ (createStageCompletedEvent(0), true),
+ (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L),
true), // onheap execution
+ (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L),
true), // offheap execution
+ (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L),
true), // offheap storage
+ (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L),
false),
+ (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L),
false),
+ (createStageCompletedEvent(1), true),
+ (SparkListenerApplicationEnd(1000L), true))
+
+ // play the events for the event logger
+ eventLogger.start()
+ listenerBus.start(Mockito.mock(classOf[SparkContext]),
Mockito.mock(classOf[MetricsSystem]))
+ listenerBus.addToEventLogQueue(eventLogger)
+ for ((event, included) <- events) {
+ listenerBus.post(event)
+ }
+ listenerBus.stop()
+ eventLogger.stop()
+
+ // Verify the log file contains the expected events
+ val logData = EventLoggingListener.openEventLog(new
Path(eventLogger.logPath), fileSystem)
+ try {
+ val lines = readLines(logData)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ assert(lines.size === 19)
+ assert(lines(0).contains("SparkListenerLogStart"))
+ assert(lines(1).contains("SparkListenerApplicationStart"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ var i = 1
+ for ((event, included) <- events) {
+ if (included) {
+ checkEvent(lines(i), event)
+ i += 1
+ }
+ }
+ } finally {
+ logData.close()
+ }
+ }
+
+ /** Create a stage submitted event for the specified stage Id. */
+ private def createStageSubmittedEvent(stageId: Int) =
+ SparkListenerStageSubmitted(new StageInfo(stageId, 0,
stageId.toString, 0,
+ Seq.empty, Seq.empty, "details"))
+
+ /** Create a stage completed event for the specified stage Id. */
+ private def createStageCompletedEvent(stageId: Int) =
+ SparkListenerStageCompleted(new StageInfo(stageId, 0,
stageId.toString, 0,
+ Seq.empty, Seq.empty, "details"))
+
+ /** Create an executor added event for the specified executor Id. */
+ private def createExecutorAddedEvent(executorId: Int) =
+ SparkListenerExecutorAdded(0L, executorId.toString, new
ExecutorInfo("host1", 1, Map.empty))
+
+ /** Create an executor metrics update event, with the specified executor
metrics values. */
+ private def createExecutorMetricsUpdateEvent(
+ executorId: Int, time: Long,
+ jvmUsedMemory: Long,
+ onHeapExecutionMemory: Long,
+ offHeapExecutionMemory: Long,
+ onHeapStorageMemory: Long,
+ offHeapStorageMemory: Long): SparkListenerExecutorMetricsUpdate = {
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.incDiskBytesSpilled(111)
+ taskMetrics.incMemoryBytesSpilled(222)
+ val accum = Array((333L, 1, 1,
taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
+ val executorUpdates = new ExecutorMetrics(time, jvmUsedMemory,
onHeapExecutionMemory,
+ offHeapExecutionMemory, onHeapStorageMemory, offHeapStorageMemory)
+ SparkListenerExecutorMetricsUpdate( executorId.toString, accum,
Some(executorUpdates))
+ }
+
+ /** Check that the two ExecutorMetrics match */
+ private def checkExecutorMetrics(
+ executorMetrics1: Option[ExecutorMetrics],
+ executorMetrics2: Option[ExecutorMetrics]) = {
+ executorMetrics1 match {
+ case Some(e1) =>
+ executorMetrics2 match {
+ case Some(e2) =>
+ assert(e1.timestamp === e2.timestamp)
+ assert(e1.jvmUsedMemory === e2.jvmUsedMemory)
+ assert(e1.onHeapExecutionMemory === e2.onHeapExecutionMemory)
+ assert(e1.offHeapExecutionMemory === e2.offHeapExecutionMemory)
+ assert(e1.onHeapStorageMemory === e2.onHeapStorageMemory)
+ assert(e1.offHeapStorageMemory === e2.offHeapStorageMemory)
+ }
+ case None =>
+ assert(false)
+ case None =>
--- End diff --
i think the nesting is wrong here. probably cleaner to just match both at
the same time:
```scala
(executorMetrics1, executorsMetrics2) match {
case (Some(e1), Some(e2)) =>
...
case (None, None) =>
// ok
case _ =>
fail(...)
}
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]