Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r195773554
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
---
@@ -251,6 +261,222 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
}
}
+ /**
+ * Test executor metrics update logging functionality. This checks that a
+ * SparkListenerExecutorMetricsUpdate event is added to the Spark history
+ * log if one of the executor metrics is larger than any previously
+ * recorded value for the metric, per executor per stage. The task
metrics
+ * should not be added.
+ */
+ private def testExecutorMetricsUpdateEventLogging() {
+ val conf = getLoggingConf(testDirPath, None)
+ val logName = "executorMetricsUpdated-test"
+ val eventLogger = new EventLoggingListener(logName, None,
testDirPath.toUri(), conf)
+ val listenerBus = new LiveListenerBus(conf)
+
+ // expected ExecutorMetricsUpdate, for the given stage id and executor
id
+ val expectedMetricsEvents: Map[(Int, String),
SparkListenerExecutorMetricsUpdate] =
+ Map(
+ ((0, "1"),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(-1L,
+ Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L,
20L)))),
+ ((0, "2"),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(-1L,
+ Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L)))),
+ ((1, "1"),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(-1L,
+ Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L)))),
+ ((1, "2"),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(-1L,
+ Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))))
+
+ // Events to post.
+ val events = Array(
+ SparkListenerApplicationStart("executionMetrics", None,
+ 1L, "update", None),
+ createExecutorAddedEvent(1),
+ createExecutorAddedEvent(2),
+ createStageSubmittedEvent(0),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(10L,
+ Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L))),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(10L,
+ Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L))),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(15L,
+ Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L))),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(15L,
+ Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L))),
+ createExecutorMetricsUpdateEvent(1,
+ new ExecutorMetrics(20L,
+ Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L))),
+ createExecutorMetricsUpdateEvent(2,
+ new ExecutorMetrics(20L,
+ Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L))),
+ createStageSubmittedEvent(1),
--- End diff --
another comment:
now start stage 1, one more metric update for each executor, and a new peak
for exec X
(if there is a peak update, that is ...)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]