Github user gaborgsomogyi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20888#discussion_r182011978
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
    @@ -156,43 +156,52 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
       test("Cancelling stage in a query with Range.") {
         val slices = 10
     
    -    val listener = new SparkListener {
    -      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
    -        eventually(timeout(10.seconds)) {
    -          assert(DataFrameRangeSuite.isTaskStarted)
    +    // Save and restore the value because SparkContext is shared
    +    val savedInterruptOnCancel = sparkContext
    +      .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
    +
    +    try {
    +      
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"true")
    +
    +      for (codegen <- Seq(true, false)) {
    +        val latch = new CountDownLatch(2)
    +
    +        val listener = new SparkListener {
    +          override def onTaskStart(taskStart: SparkListenerTaskStart): 
Unit = {
    +            sparkContext.cancelStage(taskStart.stageId)
    +            latch.countDown()
    +          }
             }
    -        sparkContext.cancelStage(taskStart.stageId)
    -        DataFrameRangeSuite.semaphore.release(slices)
    -      }
    -    }
     
    -    sparkContext.addSparkListener(listener)
    -    for (codegen <- Seq(true, false)) {
    -      withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    -        DataFrameRangeSuite.semaphore.drainPermits()
    -        DataFrameRangeSuite.isTaskStarted = false
    -        val ex = intercept[SparkException] {
    -          sparkContext.range(0, 10000L, numSlices = slices).mapPartitions 
{ x =>
    -            DataFrameRangeSuite.isTaskStarted = true
    -            // Block waiting for the listener to cancel the stage.
    -            DataFrameRangeSuite.semaphore.acquire()
    -            x
    -          }.toDF("id").agg(sum("id")).collect()
    +        sparkContext.addSparkListener(listener)
    +        withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
    +          val ex = intercept[SparkException] {
    +            sparkContext.range(0, 10000L, numSlices = 
slices).mapPartitions { x =>
    --- End diff --
    
    If not set then the default is the number of cores on the local machine 
which makes the test hardware dependent. It's better to define it for test 
reproducibility. On the other hand the variable declaration is not required so 
removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to