Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20993#discussion_r179794288
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
---
    @@ -349,36 +350,39 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
           }
         })
     
    -    val f = sc.parallelize(1 to 1000).map { i => (i, i) }
    +    // Explicitly disable interrupt task thread on cancelling tasks, so 
the task thread can only be
    +    // interrupted by `InterruptibleIterator`.
    +    sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, 
"false")
    +
    +    val f = sc.parallelize(1 to numElements).map { i => (i, i) }
           .repartitionAndSortWithinPartitions(new HashPartitioner(1))
           .mapPartitions { iter =>
             taskStartedSemaphore.release()
             iter
           }.foreachAsync { x =>
    -        if (x._1 >= 10) {
    -          // This block of code is partially executed. It will be blocked 
when x._1 >= 10 and the
    -          // next iteration will be cancelled if the source iterator is 
interruptible. Then in this
    -          // case, the maximum num of increment would be 10(|1...10|)
    -          taskCancelledSemaphore.acquire()
    -        }
    +        // Block this code from being executed, until the job get 
cancelled. In this case, if the
    +        // source iterator is interruptible, the max number of increment 
should be under
    +        // `numElements`.
    +        taskCancelledSemaphore.acquire()
             executionOfInterruptibleCounter.getAndIncrement()
         }
     
         taskStartedSemaphore.acquire()
         // Job is cancelled when:
         // 1. task in reduce stage has been started, guaranteed by previous 
line.
    -    // 2. task in reduce stage is blocked after processing at most 10 
records as
    -    //    taskCancelledSemaphore is not released until cancelTasks event 
is posted
    -    // After job being cancelled, task in reduce stage will be cancelled 
and no more iteration are
    -    // executed.
    +    // 2. task in reduce stage is blocked as taskCancelledSemaphore is not 
released until
    +    //    JobCancelled event is posted.
    +    // After job being cancelled, task in reduce stage will be cancelled 
asynchronously, thus
    +    // partial of the inputs should not get processed (It's very unlikely 
that Spark can process
    +    // 10000 elements between JobCancelled is posted and task is really 
killed).
    --- End diff --
    
    I  re-checked the `killTask` code. I believe there's still possibility(very 
unlikely) that the reduce task processes all the input elements before task is 
really killed, then we cannot observe the reduce task being interruptible.
    
    One way to reduce possibility would be increasing the num of input 
elements. So I believe we should add comments in `val numElements = 10000` to 
make laters know that we choose `10000` for a reason.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to