Github user rezasafi commented on a diff in the pull request:
https://github.com/apache/spark/pull/21221#discussion_r204976606
--- Diff:
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
---
@@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite
with LocalSparkContext wit
}
}
+ /**
+ * Test stage executor metrics logging functionality. This checks that
peak
+ * values from SparkListenerExecutorMetricsUpdate events during a stage
are
+ * logged in a StageExecutorMetrics event for each executor at stage
completion.
+ */
+ private def testStageExecutorMetricsEventLogging() {
+ val conf = getLoggingConf(testDirPath, None)
+ val logName = "stageExecutorMetrics-test"
+ val eventLogger = new EventLoggingListener(logName, None,
testDirPath.toUri(), conf)
+ val listenerBus = new LiveListenerBus(conf)
+
+ // expected StageExecutorMetrics, for the given stage id and executor
id
+ val expectedMetricsEvents: Map[(Int, String),
SparkListenerStageExecutorMetrics] =
+ Map(
+ ((0, "1"),
+ new SparkListenerStageExecutorMetrics("1", 0, 0,
+ Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))),
+ ((0, "2"),
+ new SparkListenerStageExecutorMetrics("2", 0, 0,
+ Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))),
+ ((1, "1"),
+ new SparkListenerStageExecutorMetrics("1", 1, 0,
+ Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))),
+ ((1, "2"),
+ new SparkListenerStageExecutorMetrics("2", 1, 0,
+ Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L))))
+
+ // Events to post.
+ val events = Array(
+ SparkListenerApplicationStart("executionMetrics", None,
+ 1L, "update", None),
+ createExecutorAddedEvent(1),
+ createExecutorAddedEvent(2),
+ createStageSubmittedEvent(0),
+ // receive 3 metric updates from each executor with just stage 0
running,
+ // with different peak updates for each executor
+ createExecutorMetricsUpdateEvent(1,
+ Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)),
+ createExecutorMetricsUpdateEvent(2,
+ Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)),
+ // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+ createExecutorMetricsUpdateEvent(1,
+ Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)),
+ // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+ createExecutorMetricsUpdateEvent(2,
+ Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)),
+ // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+ createExecutorMetricsUpdateEvent(1,
+ Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)),
+ // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+ createExecutorMetricsUpdateEvent(2,
+ Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)),
+ // now start stage 1, one more metric update for each executor, and
new
+ // peaks for some stage 1 metrics (as listed), initialize stage 1
peaks
+ createStageSubmittedEvent(1),
+ // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
--- End diff --
Are this comment and the one in line 322 correct? Shouldn't it say stage 1?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]