Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20888#discussion_r180515546
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
    @@ -164,10 +164,13 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
         sparkContext.addSparkListener(listener)
         for (codegen <- Seq(true, false)) {
           withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    -        DataFrameRangeSuite.stageToKill = -1
    +        DataFrameRangeSuite.stageToKill = 
DataFrameRangeSuite.INVALID_STAGE_ID
             val ex = intercept[SparkException] {
               spark.range(0, 100000000000L, 1, 1).map { x =>
    -            DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
    +            val taskContext = TaskContext.get()
    +            if (!taskContext.isInterrupted()) {
    --- End diff --
    
    What if this runs after the stage is canceled in the listener but before 
the thread is interrupted?
    
    The more I look at this, the more it looks like SPARK-22764 (the 
SparkContextSuite flakiness). If you do the same thing here, I'm pretty sure it 
will work.
    
    - in the listener, wait until the task signals it's running
    - in the task, signal that it's running (by setting a boolean, like in 
SparkContextSuite, or through some other means), then go to sleep
    - in the listener, cancel the stage after the tasks signal they're running
    
    This makes sure both that the tasks have started and have not yet finished 
when the listener cancels the stage.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to