Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19981#discussion_r157135580
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
---
@@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext with JsonTest
(id, accumulatorValue)
}.toMap
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(
createStageInfo(0, 0),
createStageInfo(1, 0)
),
createProperties(executionId)))
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
- assert(store.executionMetrics(0).isEmpty)
+ assert(statusStore.executionMetrics(executionId).isEmpty)
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
- bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+ listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L,
2L))))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2)))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 3))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 3))
// Retrying a stage should reset the metrics
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Ignore the task end for the first attempt
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Finish two tasks
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
null))
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 5))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 5))
// Summit a new stage
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 7))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 7))
// Finish two tasks
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 11))
- assertJobs(store.execution(0), running = Seq(0))
+ assertJobs(statusStore.execution(executionId), running = Seq(0))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- assertJobs(store.execution(0), completed = Seq(0))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0))
+
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 11))
}
- sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
(store, bus) =>
+ test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- assertJobs(store.execution(0), completed = Seq(0))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0))
}
- sqlStoreTest("onExecutionEnd happens before multiple
onJobEnd(JobSucceeded)s") { (store, bus) =>
+ test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 1,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
JobSucceeded
))
- assertJobs(store.execution(0), completed = Seq(0, 1))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
}
- sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") {
(store, bus) =>
+ test("onExecutionEnd happens before onJobEnd(JobFailed)") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))
))
- assertJobs(store.execution(0), failed = Seq(0))
+ assertJobs(statusStore.execution(executionId), failed = Seq(0))
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber = statusStore.executionsList().size
- spark.sparkContext.parallelize(1 to 10).foreach(i => ())
- spark.sparkContext.listenerBus.waitUntilEmpty(10000)
--- End diff --
Previously we did not attach the testing listener to spark event bus, fixed
now.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]