Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r157129912
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext with JsonTest
(id, accumulatorValue)
}.toMap
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- val executionUIData = listener.executionIdToData(0)
-
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(
createStageInfo(0, 0),
createStageInfo(1, 0)
),
createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
- assert(listener.getExecutionMetrics(0).isEmpty)
+ assert(store.executionMetrics(0).isEmpty)
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
- listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L,
2L))))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
- createTaskMetrics(accumulatorUpdates.mapValues(_ *
2)).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2)))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 3))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 3))
// Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Ignore the task end for the first attempt
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 5))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 5))
// Summit a new stage
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 7))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 7))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
- assert(executionUIData.runningJobs === Seq(0))
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), running = Seq(0))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
-
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ assertJobs(store.execution(0), completed = Seq(0))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
}
- test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0))
}
- test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before multiple
onJobEnd(JobSucceeded)s") { (store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 1,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0, 1))
}
- test("onExecutionEnd happens before onJobEnd(JobFailed)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs === Seq(0))
+ assertJobs(store.execution(0), failed = Seq(0))
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber =
spark.sharedState.listener.stageIdToStageMetrics.size
+ val previousStageNumber = statusStore.executionsList().size
spark.sparkContext.parallelize(1 to 10).foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
--- End diff --
ditto, the listener is not attached to spark event bus.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]