Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20993#discussion_r179722604
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
---
    @@ -349,36 +350,38 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
           }
         })
     
    -    val f = sc.parallelize(1 to 1000).map { i => (i, i) }
    +    // Explicitly disable interrupt task thread on cancelling tasks, so 
the task thread can only be
    +    // interrupted by `InterruptibleIterator`.
    +    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false")
    +
    +    val f = sc.parallelize(1 to numElements).map { i => (i, i) }
           .repartitionAndSortWithinPartitions(new HashPartitioner(1))
           .mapPartitions { iter =>
             taskStartedSemaphore.release()
             iter
           }.foreachAsync { x =>
    -        if (x._1 >= 10) {
    -          // This block of code is partially executed. It will be blocked 
when x._1 >= 10 and the
    -          // next iteration will be cancelled if the source iterator is 
interruptible. Then in this
    -          // case, the maximum num of increment would be 10(|1...10|)
    -          taskCancelledSemaphore.acquire()
    -        }
    +        // Block this code from being executed, until the job get 
cancelled. In this case, if the
    +        // source iterator is interruptible, the max number of increment 
should be under
    +        // `numElements`.
    +        taskCancelledSemaphore.acquire()
             executionOfInterruptibleCounter.getAndIncrement()
         }
     
         taskStartedSemaphore.acquire()
         // Job is cancelled when:
         // 1. task in reduce stage has been started, guaranteed by previous 
line.
    -    // 2. task in reduce stage is blocked after processing at most 10 
records as
    -    //    taskCancelledSemaphore is not released until cancelTasks event 
is posted
    -    // After job being cancelled, task in reduce stage will be cancelled 
and no more iteration are
    -    // executed.
    +    // 2. task in reduce stage is blocked as taskCancelledSemaphore is not 
released until
    +    //    JobCancelled event is posted.
    +    // After job being cancelled, task in reduce stage will be cancelled 
asynchronously, thus
    +    // partial of the inputs should not get processed.
    --- End diff --
    
    `thus partial of the inputs should not get processed.` ->
    `It's very unlikely that Spark can process 10000 elements between 
JobCancelled is posted and task is really killed.`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to