Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19681#discussion_r157129757
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
---
@@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with
SharedSQLContext with JsonTest
(id, accumulatorValue)
}.toMap
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- val executionUIData = listener.executionIdToData(0)
-
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq(
createStageInfo(0, 0),
createStageInfo(1, 0)
),
createProperties(executionId)))
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
- assert(listener.getExecutionMetrics(0).isEmpty)
+ assert(store.executionMetrics(0).isEmpty)
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Driver accumulator updates don't belong to this execution should be
filtered and no
// exception will be thrown.
- listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L,
2L))))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
+
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 0,
- createTaskMetrics(accumulatorUpdates.mapValues(_ *
2)).accumulators().map(makeInfo))
+ (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ *
2)))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 3))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 3))
// Retrying a stage should reset the metrics
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 0, 1,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Ignore the task end for the first attempt
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 2))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 2))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 0,
stageAttemptId = 1,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 5))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 5))
// Summit a new stage
-
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
+ bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
-
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
+ bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
- (0L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
- (1L, 1, 0,
createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
+ (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
+ (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
)))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 7))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 7))
// Finish two tasks
- listener.onTaskEnd(SparkListenerTaskEnd(
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(0, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
- listener.onTaskEnd(SparkListenerTaskEnd(
+ createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
+ bus.postToAll(SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "",
reason = null,
- createTaskInfo(1, 0),
- createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
+ createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
+ null))
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
- assert(executionUIData.runningJobs === Seq(0))
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), running = Seq(0))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
-
- checkAnswer(listener.getExecutionMetrics(0),
accumulatorUpdates.mapValues(_ * 11))
+ assertJobs(store.execution(0), completed = Seq(0))
+ checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_
* 11))
}
- test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs === Seq(0))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0))
}
- test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before multiple
onJobEnd(JobSucceeded)s") { (store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobSucceeded
))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 1,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
JobSucceeded
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
- assert(executionUIData.failedJobs.isEmpty)
+ assertJobs(store.execution(0), completed = Seq(0, 1))
}
- test("onExecutionEnd happens before onJobEnd(JobFailed)") {
- val listener = new SQLListener(spark.sparkContext.conf)
+ sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") {
(store, bus) =>
val executionId = 0
val df = createTestDataFrame
- listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ bus.postToAll(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
- listener.onJobStart(SparkListenerJobStart(
+ bus.postToAll(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
- listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+ bus.postToAll(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
- listener.onJobEnd(SparkListenerJobEnd(
+ bus.postToAll(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
JobFailed(new RuntimeException("Oops"))
))
- val executionUIData = listener.executionIdToData(0)
- assert(executionUIData.runningJobs.isEmpty)
- assert(executionUIData.succeededJobs.isEmpty)
- assert(executionUIData.failedJobs === Seq(0))
+ assertJobs(store.execution(0), failed = Seq(0))
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber =
spark.sharedState.listener.stageIdToStageMetrics.size
+ val previousStageNumber = statusStore.executionsList().size
spark.sparkContext.parallelize(1 to 10).foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should ignore the non SQL stage
- assert(spark.sharedState.listener.stageIdToStageMetrics.size ==
previousStageNumber)
+ assert(statusStore.executionsList().size == previousStageNumber)
spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should save the SQL stage
- assert(spark.sharedState.listener.stageIdToStageMetrics.size ==
previousStageNumber + 1)
- }
-
- test("SPARK-13055: history listener only tracks SQL metrics") {
- val listener = new SQLHistoryListener(sparkContext.conf,
mock(classOf[SparkUI]))
- // We need to post other events for the listener to track our
accumulators.
- // These are largely just boilerplate unrelated to what we're trying
to test.
- val df = createTestDataFrame
- val executionStart = SparkListenerSQLExecutionStart(
- 0, "", "", "",
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
- val stageInfo = createStageInfo(0, 0)
- val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo),
createProperties(0))
- val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
- // This task has both accumulators that are SQL metrics and
accumulators that are not.
- // The listener should only track the ones that are actually SQL
metrics.
- val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
- val nonSqlMetric = sparkContext.longAccumulator("baseball")
- val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
- val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value),
None)
- val taskInfo = createTaskInfo(0, 0)
- taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
- val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null,
taskInfo, null)
- listener.onOtherEvent(executionStart)
- listener.onJobStart(jobStart)
- listener.onStageSubmitted(stageSubmitted)
- // Before SPARK-13055, this throws ClassCastException because the
history listener would
- // assume that the accumulator value is of type Long, but this may not
be true for
- // accumulators that are not SQL metrics.
- listener.onTaskEnd(taskEnd)
- val trackedAccums = listener.stageIdToStageMetrics.values.flatMap {
stageMetrics =>
-
stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
- }
- // Listener tracks only SQL metrics, not other accumulators
- assert(trackedAccums.size === 1)
- assert(trackedAccums.head === ((sqlMetricInfo.id,
sqlMetricInfo.update.get)))
+ assert(statusStore.executionsList().size == previousStageNumber + 1)
}
test("driver side SQL metrics") {
- val listener = new SQLListener(spark.sparkContext.conf)
- val expectedAccumValue = 12345
+ val oldCount = statusStore.executionsList().size
+ val expectedAccumValue = 12345L
val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
- sqlContext.sparkContext.addSparkListener(listener)
val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
override lazy val sparkPlan = physicalPlan
override lazy val executedPlan = physicalPlan
}
+
SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
physicalPlan.execute().collect()
}
- def waitTillExecutionFinished(): Unit = {
- while (listener.getCompletedExecutions.isEmpty) {
- Thread.sleep(100)
+ while (statusStore.executionsList().size < oldCount) {
--- End diff --
This doesn't mean execution ends now, but execution starts.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]