Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157279430
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
 ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
     
         // Driver accumulator updates don't belong to this execution should be 
filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 
2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
     
         // Retrying a stage should reset the metrics
    -    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 5))
     
         // Summit a new stage
    -    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, 
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), 
accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { 
(store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple 
onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { 
(store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = 
spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should ignore the non SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == 
previousStageNumber)
    +    assert(statusStore.executionsList().size == previousStageNumber)
     
         spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should save the SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == 
previousStageNumber + 1)
    -  }
    -
    -  test("SPARK-13055: history listener only tracks SQL metrics") {
    -    val listener = new SQLHistoryListener(sparkContext.conf, 
mock(classOf[SparkUI]))
    -    // We need to post other events for the listener to track our 
accumulators.
    -    // These are largely just boilerplate unrelated to what we're trying 
to test.
    -    val df = createTestDataFrame
    -    val executionStart = SparkListenerSQLExecutionStart(
    -      0, "", "", "", 
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
    -    val stageInfo = createStageInfo(0, 0)
    -    val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), 
createProperties(0))
    -    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
    -    // This task has both accumulators that are SQL metrics and 
accumulators that are not.
    -    // The listener should only track the ones that are actually SQL 
metrics.
    -    val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
    -    val nonSqlMetric = sparkContext.longAccumulator("baseball")
    -    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
    -    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), 
None)
    -    val taskInfo = createTaskInfo(0, 0)
    -    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
    -    val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, 
taskInfo, null)
    -    listener.onOtherEvent(executionStart)
    -    listener.onJobStart(jobStart)
    -    listener.onStageSubmitted(stageSubmitted)
    -    // Before SPARK-13055, this throws ClassCastException because the 
history listener would
    -    // assume that the accumulator value is of type Long, but this may not 
be true for
    -    // accumulators that are not SQL metrics.
    -    listener.onTaskEnd(taskEnd)
    -    val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { 
stageMetrics =>
    -      
stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
    -    }
    -    // Listener tracks only SQL metrics, not other accumulators
    -    assert(trackedAccums.size === 1)
    -    assert(trackedAccums.head === ((sqlMetricInfo.id, 
sqlMetricInfo.update.get)))
    +    assert(statusStore.executionsList().size == previousStageNumber + 1)
       }
     
       test("driver side SQL metrics") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    -    val expectedAccumValue = 12345
    +    val oldCount = statusStore.executionsList().size
    +    val expectedAccumValue = 12345L
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    sqlContext.sparkContext.addSparkListener(listener)
    --- End diff --
    
    Yes, it works. There are two kinds of tests in this suite now:
    
    - "test(blah)" like this one which uses the active spark session's listener
    - "sqlStoreTest" which manually drive a replay bus and verify expected 
changes in the store.
    
    This particular test (and the other one you commented on) are the first 
kind. Tests of the other kind do not run actual jobs, they just inject events 
into the replay bus manually.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to