Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r157279430
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext with JsonTest
(id, accumulatorValue)
}.toMap
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- val executionUIData = listener.executionIdToData(0)
-
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(
createStageInfo(0, 0),
createStageInfo(1, 0)
),
createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
- assert(listener.getExecutionMetrics(0).isEmpty)
+ assert(store.executionMetrics(0).isEmpty)
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
- listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L,
2L))))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
- createTaskMetrics(accumulatorUpdates.mapValues(_ *
2)).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2)))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 3))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 3))
// Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Ignore the task end for the first attempt
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 5))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 5))
// Summit a new stage
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 7))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 7))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
- assert(executionUIData.runningJobs === Seq(0))
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), running = Seq(0))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
-
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ assertJobs(store.execution(0), completed = Seq(0))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
}
- test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0))
}
- test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before multiple
onJobEnd(JobSucceeded)s") { (store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 1,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0, 1))
}
- test("onExecutionEnd happens before onJobEnd(JobFailed)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs === Seq(0))
+ assertJobs(store.execution(0), failed = Seq(0))
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber =
spark.sharedState.listener.stageIdToStageMetrics.size
+ val previousStageNumber = statusStore.executionsList().size
spark.sparkContext.parallelize(1 to 10).foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should ignore the non SQL stage
- assert(spark.sharedState.listener.stageIdToStageMetrics.size ==
previousStageNumber)
+ assert(statusStore.executionsList().size == previousStageNumber)
spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should save the SQL stage
- assert(spark.sharedState.listener.stageIdToStageMetrics.size ==
previousStageNumber + 1)
- }
-
- test("SPARK-13055: history listener only tracks SQL metrics") {
- val listener = new SQLHistoryListener(sparkContext.conf,
mock(classOf[SparkUI]))
- // We need to post other events for the listener to track our
accumulators.
- // These are largely just boilerplate unrelated to what we're trying
to test.
- val df = createTestDataFrame
- val executionStart = SparkListenerSQLExecutionStart(
- 0, "", "", "",
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
- val stageInfo = createStageInfo(0, 0)
- val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo),
createProperties(0))
- val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
- // This task has both accumulators that are SQL metrics and
accumulators that are not.
- // The listener should only track the ones that are actually SQL
metrics.
- val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
- val nonSqlMetric = sparkContext.longAccumulator("baseball")
- val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
- val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value),
None)
- val taskInfo = createTaskInfo(0, 0)
- taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
- val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null,
taskInfo, null)
- listener.onOtherEvent(executionStart)
- listener.onJobStart(jobStart)
- listener.onStageSubmitted(stageSubmitted)
- // Before SPARK-13055, this throws ClassCastException because the
history listener would
- // assume that the accumulator value is of type Long, but this may not
be true for
- // accumulators that are not SQL metrics.
- listener.onTaskEnd(taskEnd)
- val trackedAccums = listener.stageIdToStageMetrics.values.flatMap {
stageMetrics =>
-
stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
- }
- // Listener tracks only SQL metrics, not other accumulators
- assert(trackedAccums.size === 1)
- assert(trackedAccums.head === ((sqlMetricInfo.id,
sqlMetricInfo.update.get)))
+ assert(statusStore.executionsList().size == previousStageNumber + 1)
}
test("driver side SQL metrics") {
- val listener = new SQLListener(spark.sparkContext.conf)
- val expectedAccumValue = 12345
+ val oldCount = statusStore.executionsList().size
+ val expectedAccumValue = 12345L
val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
- sqlContext.sparkContext.addSparkListener(listener)
--- End diff --
Yes, it works. There are two kinds of tests in this suite now:
- "test(blah)" like this one which uses the active spark session's listener
- "sqlStoreTest" which manually drive a replay bus and verify expected
changes in the store.
This particular test (and the other one you commented on) are the first
kind. Tests of the other kind do not run actual jobs, they just inject events
into the replay bus manually.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]