Github user edwinalu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21221#discussion_r205095575
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
    @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
         }
       }
     
    +  /**
    +   * Test stage executor metrics logging functionality. This checks that 
peak
    +   * values from SparkListenerExecutorMetricsUpdate events during a stage 
are
    +   * logged in a StageExecutorMetrics event for each executor at stage 
completion.
    +   */
    +  private def testStageExecutorMetricsEventLogging() {
    +    val conf = getLoggingConf(testDirPath, None)
    +    val logName = "stageExecutorMetrics-test"
    +    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
    +    val listenerBus = new LiveListenerBus(conf)
    +
    +    // expected StageExecutorMetrics, for the given stage id and executor 
id
    +    val expectedMetricsEvents: Map[(Int, String), 
SparkListenerStageExecutorMetrics] =
    +      Map(
    +        ((0, "1"),
    +          new SparkListenerStageExecutorMetrics("1", 0, 0,
    +              Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))),
    +        ((0, "2"),
    +          new SparkListenerStageExecutorMetrics("2", 0, 0,
    +              Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))),
    +        ((1, "1"),
    +          new SparkListenerStageExecutorMetrics("1", 1, 0,
    +              Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))),
    +        ((1, "2"),
    +          new SparkListenerStageExecutorMetrics("2", 1, 0,
    +              Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L))))
    +
    +    // Events to post.
    +    val events = Array(
    +      SparkListenerApplicationStart("executionMetrics", None,
    +        1L, "update", None),
    +      createExecutorAddedEvent(1),
    +      createExecutorAddedEvent(2),
    +      createStageSubmittedEvent(0),
    +      // receive 3 metric updates from each executor with just stage 0 
running,
    +      // with different peak updates for each executor
    +      createExecutorMetricsUpdateEvent(1,
    +          Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)),
    +      createExecutorMetricsUpdateEvent(2,
    +          Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)),
    +      // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
    +      createExecutorMetricsUpdateEvent(1,
    +          Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)),
    +      // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
    +      createExecutorMetricsUpdateEvent(2,
    +          Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)),
    +      // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
    +      createExecutorMetricsUpdateEvent(1,
    +          Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)),
    +      // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
    +      createExecutorMetricsUpdateEvent(2,
    +          Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)),
    +      // now start stage 1, one more metric update for each executor, and 
new
    +      // peaks for some stage 1 metrics (as listed), initialize stage 1 
peaks
    +      createStageSubmittedEvent(1),
    +      // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
    --- End diff --
    
    Stage 0 is still running, and these are new peaks for that stage. It is 
also initializing all the stage 1 metric values, since these are the first 
executor metrics seen for stage 1 (I'll add this to the comments).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to