Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20888#discussion_r181489845
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -152,22 +154,28 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
}
test("Cancelling stage in a query with Range.") {
+ val slices = 10
+
val listener = new SparkListener {
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- eventually(timeout(10.seconds), interval(1.millis)) {
- assert(DataFrameRangeSuite.stageToKill > 0)
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ eventually(timeout(10.seconds)) {
+ assert(DataFrameRangeSuite.isTaskStarted)
}
- sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+ sparkContext.cancelStage(taskStart.stageId)
+ DataFrameRangeSuite.semaphore.release(slices)
--- End diff --
If we do that so the stopped threads not get died. In the RDD iterator the
following code makes sure that the thread will be killed:
```
private[spark] override def killTaskIfInterrupted(): Unit = {
âval reason = reasonIfKilled
ââif (reason.isDefined) {
ââââthrow new TaskKilledException(reason.get)
ââ}
}
```
If `SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL` will be set then they will
die properly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]