Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r190363971
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
---
@@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
}
}
+ /**
+ * Test executor metrics update logging functionality. This checks that a
+ * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+ * log if one of the executor metrics is larger than any previously
+ * recorded value for the metric, per executor per stage. The task
metrics
+ * should not be added.
+ */
+ private def testExecutorMetricsUpdateEventLogging() {
+ val conf = getLoggingConf(testDirPath, None)
+ val logName = "executorMetricsUpdated-test"
+ val eventLogger = new EventLoggingListener(logName, None,
testDirPath.toUri(), conf)
+ val listenerBus = new LiveListenerBus(conf)
+
+ // expected ExecutorMetricsUpdate, for the given stage id and executor
id
+ val expectedMetricsEvents: Map[(Int, String),
SparkListenerExecutorMetricsUpdate] =
+ Map(
+ ((0, "1"),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L,
30L, 70L, 20L))),
+ ((0, "2"),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L,
30L, 80L, 40L))),
+ ((1, "1"),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L,
55L, 50L, 0L))),
+ ((1, "2"),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L,
60L, 40L, 40L))))
+
+ // Events to post.
+ val events = Array(
+ SparkListenerApplicationStart("executionMetrics", None,
+ 1L, "update", None),
+ createExecutorAddedEvent(1),
+ createExecutorAddedEvent(2),
+ createStageSubmittedEvent(0),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L,
70L, 20L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L,
70L, 0L)),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L,
70L, 20L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L,
70L, 0L)),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L,
50L, 0L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L,
80L, 0L)),
+ createStageSubmittedEvent(1),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L,
50L, 0L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L,
10L, 40L)),
+ createStageCompletedEvent(0),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L,
30L, 0L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L,
40L, 20L)),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L,
30L, 0L)),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L,
0L, 20L)),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L,
20L, 0L)),
+ createExecutorRemovedEvent(1),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L,
0L, 0L)),
+ createStageCompletedEvent(1),
+ SparkListenerApplicationEnd(1000L))
+
+ // play the events for the event logger
+ eventLogger.start()
+ listenerBus.start(Mockito.mock(classOf[SparkContext]),
Mockito.mock(classOf[MetricsSystem]))
+ listenerBus.addToEventLogQueue(eventLogger)
+ events.foreach(event => listenerBus.post(event))
+ listenerBus.stop()
+ eventLogger.stop()
+
+ // Verify the log file contains the expected events.
+ // Posted events should be logged, except for ExecutorMetricsUpdate
events -- these
+ // are consolidated, and the peak values for each stage are logged at
stage end.
+ val logData = EventLoggingListener.openEventLog(new
Path(eventLogger.logPath), fileSystem)
+ try {
+ val lines = readLines(logData)
+ val logStart = SparkListenerLogStart(SPARK_VERSION)
+ assert(lines.size === 14)
+ assert(lines(0).contains("SparkListenerLogStart"))
+ assert(lines(1).contains("SparkListenerApplicationStart"))
+ assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+ var i = 1
+ events.foreach {event =>
+ event match {
+ case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+ case stageCompleted: SparkListenerStageCompleted =>
+ for (j <- 1 to 2) {
+ checkExecutorMetricsUpdate(lines(i),
stageCompleted.stageInfo.stageId,
+ expectedMetricsEvents)
+ i += 1
+ }
+ checkEvent(lines(i), event)
+ i += 1
+ case _ =>
+ checkEvent(lines(i), event)
+ i += 1
+ }
+ }
+ } finally {
+ logData.close()
+ }
+ }
+
+ /** Create a stage submitted event for the specified stage Id. */
+ private def createStageSubmittedEvent(stageId: Int) = {
+ SparkListenerStageSubmitted(new StageInfo(stageId, 0,
stageId.toString, 0,
+ Seq.empty, Seq.empty, "details"))
+ }
+
+ /** Create a stage completed event for the specified stage Id. */
+ private def createStageCompletedEvent(stageId: Int) = {
+ SparkListenerStageCompleted(new StageInfo(stageId, 0,
stageId.toString, 0,
+ Seq.empty, Seq.empty, "details"))
+ }
+
+ /** Create an executor added event for the specified executor Id. */
+ private def createExecutorAddedEvent(executorId: Int) = {
+ SparkListenerExecutorAdded(0L, executorId.toString, new
ExecutorInfo("host1", 1, Map.empty))
+ }
+
+ /** Create an executor added event for the specified executor Id. */
+ private def createExecutorRemovedEvent(executorId: Int) = {
+ SparkListenerExecutorRemoved(0L, executorId.toString, "test")
+ }
+
+ /** Create an executor metrics update event, with the specified executor
metrics values. */
+ private def createExecutorMetricsUpdateEvent(
+ executorId: Int,
+ executorMetrics: ExecutorMetrics):
SparkListenerExecutorMetricsUpdate = {
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.incDiskBytesSpilled(111)
+ taskMetrics.incMemoryBytesSpilled(222)
+ val accum = Array((333L, 1, 1,
taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
+ SparkListenerExecutorMetricsUpdate(executorId.toString, accum,
Some(executorMetrics))
+ }
+
+ /** Check that the two ExecutorMetrics match */
+ private def checkExecutorMetrics(
+ executorMetrics1: Option[ExecutorMetrics],
+ executorMetrics2: Option[ExecutorMetrics]) = {
+ (executorMetrics1, executorMetrics2) match {
+ case (Some(e1), Some(e2)) =>
+ assert(e1.timestamp === e2.timestamp)
+ assert(e1.jvmUsedHeapMemory === e2.jvmUsedHeapMemory)
+ assert(e1.jvmUsedNonHeapMemory === e2.jvmUsedNonHeapMemory)
+ assert(e1.onHeapExecutionMemory === e2.onHeapExecutionMemory)
+ assert(e1.offHeapExecutionMemory === e2.offHeapExecutionMemory)
+ assert(e1.onHeapStorageMemory === e2.onHeapStorageMemory)
+ assert(e1.offHeapStorageMemory === e2.offHeapStorageMemory)
+ assert(e1.onHeapUnifiedMemory === e2.onHeapUnifiedMemory)
+ assert(e1.offHeapUnifiedMemory === e2.offHeapUnifiedMemory)
+ assert(e1.directMemory === e2.directMemory)
+ assert(e1.mappedMemory === e2.mappedMemory)
+ case (None, None) =>
+ case _ =>
+ assert(false)
+ }
+ }
+
+ /** Check that the Spark history log line matches the expected event. */
+ private def checkEvent(line: String, event: SparkListenerEvent): Unit = {
+ assert(line.contains(event.getClass.toString.split("\\.").last))
+ event match {
+ case executorMetrics: SparkListenerExecutorMetricsUpdate =>
+ JsonProtocol.sparkEventFromJson(parse(line)) match {
--- End diff --
you can pull `JsonProtocol.sparkEventFromJson(parse(line))` out to avoid
repeating, along with the type comparison.
```scala
val parsed = JsonProtocol.sparkEventFromJson(parse(line))
assert(parsed.getClass === event.getClass)
event match {
...
```
(also `assertTypeError` does something else entirely:
http://doc.scalatest.org/2.2.6/#org.scalatest.Assertions)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]