Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19981#discussion_r157317442
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
---
@@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext with JsonTest
(id, accumulatorValue)
}.toMap
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(
createStageInfo(0, 0),
createStageInfo(1, 0)
),
createProperties(executionId)))
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
- assert(store.executionMetrics(0).isEmpty)
+ assert(statusStore.executionMetrics(executionId).isEmpty)
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
- bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
+ listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L,
2L))))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2)))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 3))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 3))
// Retrying a stage should reset the metrics
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
(1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Ignore the task end for the first attempt
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 2))
// Finish two tasks
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
null))
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 5))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 5))
// Summit a new stage
- bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
+
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
- bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
+
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
(1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 7))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 7))
// Finish two tasks
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- bus.postToAll(SparkListenerTaskEnd(
+ listener.onTaskEnd(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
null))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 11))
- assertJobs(store.execution(0), running = Seq(0))
+ assertJobs(statusStore.execution(executionId), running = Seq(0))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- assertJobs(store.execution(0), completed = Seq(0))
- checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0))
+
+ checkAnswer(statusStore.executionMetrics(executionId),
accumulatorUpdates.mapValues(_ * 11))
}
- sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
(store, bus) =>
+ test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- assertJobs(store.execution(0), completed = Seq(0))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0))
}
- sqlStoreTest("onExecutionEnd happens before multiple
onJobEnd(JobSucceeded)s") { (store, bus) =>
+ test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 1,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
JobSucceeded
))
- assertJobs(store.execution(0), completed = Seq(0, 1))
+ assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
}
- sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") {
(store, bus) =>
+ test("onExecutionEnd happens before onJobEnd(JobFailed)") {
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+
val executionId = 0
val df = createTestDataFrame
- bus.postToAll(SparkListenerSQLExecutionStart(
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobStart(
+ listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- bus.postToAll(SparkListenerSQLExecutionEnd(
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- bus.postToAll(SparkListenerJobEnd(
+ listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))
))
- assertJobs(store.execution(0), failed = Seq(0))
+ assertJobs(statusStore.execution(executionId), failed = Seq(0))
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber = statusStore.executionsList().size
- spark.sparkContext.parallelize(1 to 10).foreach(i => ())
- spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- // listener should ignore the non SQL stage
- assert(statusStore.executionsList().size == previousStageNumber)
-
- spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
- spark.sparkContext.listenerBus.waitUntilEmpty(10000)
- // listener should save the SQL stage
- assert(statusStore.executionsList().size == previousStageNumber + 1)
- }
-
- test("driver side SQL metrics") {
- val oldCount = statusStore.executionsList().size
- val expectedAccumValue = 12345L
- val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
- val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
- override lazy val sparkPlan = physicalPlan
- override lazy val executedPlan = physicalPlan
- }
-
- SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
- physicalPlan.execute().collect()
- }
-
- while (statusStore.executionsList().size < oldCount) {
- Thread.sleep(100)
- }
-
- // Wait for listener to finish computing the metrics for the execution.
- while (statusStore.executionsList().last.metricValues == null) {
- Thread.sleep(100)
+ val statusStore = createStatusStore()
+ val listener = statusStore.listener.get
+ try {
+ sparkContext.addSparkListener(listener)
+ spark.sparkContext.parallelize(1 to 10).foreach(i => ())
+ spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+ // Listener should ignore the non-SQL stages, as the stage data are
only removed when SQL
+ // execution ends, which will not be triggered for non-SQL jobs.
+ assert(listener.stageMetrics.isEmpty)
+ } finally {
+ sparkContext.removeSparkListener(listener)
}
-
- val execId = statusStore.executionsList().last.executionId
- val metrics = statusStore.executionMetrics(execId)
- val driverMetric = physicalPlan.metrics("dummy")
- val expectedValue = SQLMetrics.stringValue(driverMetric.metricType,
Seq(expectedAccumValue))
-
- assert(metrics.contains(driverMetric.id))
- assert(metrics(driverMetric.id) === expectedValue)
--- End diff --
This passed before because the listener was automatically being added to
the bus using the plugin interface you've removed in this PR.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]