Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21214#discussion_r185842584
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
test("Cancelling stage in a query with Range.") {
val listener = new SparkListener {
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- eventually(timeout(10.seconds), interval(1.millis)) {
- assert(DataFrameRangeSuite.stageToKill > 0)
- }
- sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ sparkContext.cancelStage(taskStart.stageId)
}
}
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
- DataFrameRangeSuite.stageToKill = -1
val ex = intercept[SparkException] {
- spark.range(0, 100000000000L, 1, 1).map { x =>
- DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
- x
- }.toDF("id").agg(sum("id")).collect()
+ spark.range(0, 100000000000L, 1, 1)
--- End diff --
You'd probably need to use the object field for that; which would be fine
since you'd just be using it, not trying to overwrite it at any point.
But the current code is probably ok too. It's unlikely it will actually
finish before the other threads have had a chance to run. I just wish that was
a little bit more explicit in the code.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]