Github user advancedxy commented on a diff in the pull request:
https://github.com/apache/spark/pull/20993#discussion_r179794288
--- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
---
@@ -349,36 +350,39 @@ class JobCancellationSuite extends SparkFunSuite with
Matchers with BeforeAndAft
}
})
- val f = sc.parallelize(1 to 1000).map { i => (i, i) }
+ // Explicitly disable interrupt task thread on cancelling tasks, so
the task thread can only be
+ // interrupted by `InterruptibleIterator`.
+ sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
"false")
+
+ val f = sc.parallelize(1 to numElements).map { i => (i, i) }
.repartitionAndSortWithinPartitions(new HashPartitioner(1))
.mapPartitions { iter =>
taskStartedSemaphore.release()
iter
}.foreachAsync { x =>
- if (x._1 >= 10) {
- // This block of code is partially executed. It will be blocked
when x._1 >= 10 and the
- // next iteration will be cancelled if the source iterator is
interruptible. Then in this
- // case, the maximum num of increment would be 10(|1...10|)
- taskCancelledSemaphore.acquire()
- }
+ // Block this code from being executed, until the job get
cancelled. In this case, if the
+ // source iterator is interruptible, the max number of increment
should be under
+ // `numElements`.
+ taskCancelledSemaphore.acquire()
executionOfInterruptibleCounter.getAndIncrement()
}
taskStartedSemaphore.acquire()
// Job is cancelled when:
// 1. task in reduce stage has been started, guaranteed by previous
line.
- // 2. task in reduce stage is blocked after processing at most 10
records as
- // taskCancelledSemaphore is not released until cancelTasks event
is posted
- // After job being cancelled, task in reduce stage will be cancelled
and no more iteration are
- // executed.
+ // 2. task in reduce stage is blocked as taskCancelledSemaphore is not
released until
+ // JobCancelled event is posted.
+ // After job being cancelled, task in reduce stage will be cancelled
asynchronously, thus
+ // partial of the inputs should not get processed (It's very unlikely
that Spark can process
+ // 10000 elements between JobCancelled is posted and task is really
killed).
--- End diff --
I re-checked the `killTask` code. I believe there's still possibility(very
unlikely) that the reduce task processes all the input elements before task is
really killed, then we cannot observe the reduce task being interruptible.
One way to reduce possibility would be increasing the num of input
elements. So I believe we should add comments in `val numElements = 10000` to
make laters know that we choose `10000` for a reason.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]