Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21214#discussion_r185842584
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
    @@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
     
       test("Cancelling stage in a query with Range.") {
         val listener = new SparkListener {
    -      override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    -        eventually(timeout(10.seconds), interval(1.millis)) {
    -          assert(DataFrameRangeSuite.stageToKill > 0)
    -        }
    -        sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
    +      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    +        sparkContext.cancelStage(taskStart.stageId)
           }
         }
     
         sparkContext.addSparkListener(listener)
         for (codegen <- Seq(true, false)) {
           withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    -        DataFrameRangeSuite.stageToKill = -1
             val ex = intercept[SparkException] {
    -          spark.range(0, 100000000000L, 1, 1).map { x =>
    -            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
    -            x
    -          }.toDF("id").agg(sum("id")).collect()
    +          spark.range(0, 100000000000L, 1, 1)
    --- End diff --
    
    You'd probably need to use the object field for that; which would be fine 
since you'd just be using it, not trying to overwrite it at any point.
    
    But the current code is probably ok too. It's unlikely it will actually 
finish before the other threads have had a chance to run. I just wish that was 
a little bit more explicit in the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to