Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/20888#discussion_r182011978
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -156,43 +156,52 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
test("Cancelling stage in a query with Range.") {
val slices = 10
- val listener = new SparkListener {
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- eventually(timeout(10.seconds)) {
- assert(DataFrameRangeSuite.isTaskStarted)
+ // Save and restore the value because SparkContext is shared
+ val savedInterruptOnCancel = sparkContext
+ .getLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL)
+
+ try {
+
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,
"true")
+
+ for (codegen <- Seq(true, false)) {
+ val latch = new CountDownLatch(2)
+
+ val listener = new SparkListener {
+ override def onTaskStart(taskStart: SparkListenerTaskStart):
Unit = {
+ sparkContext.cancelStage(taskStart.stageId)
+ latch.countDown()
+ }
}
- sparkContext.cancelStage(taskStart.stageId)
- DataFrameRangeSuite.semaphore.release(slices)
- }
- }
- sparkContext.addSparkListener(listener)
- for (codegen <- Seq(true, false)) {
- withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
- DataFrameRangeSuite.semaphore.drainPermits()
- DataFrameRangeSuite.isTaskStarted = false
- val ex = intercept[SparkException] {
- sparkContext.range(0, 10000L, numSlices = slices).mapPartitions
{ x =>
- DataFrameRangeSuite.isTaskStarted = true
- // Block waiting for the listener to cancel the stage.
- DataFrameRangeSuite.semaphore.acquire()
- x
- }.toDF("id").agg(sum("id")).collect()
+ sparkContext.addSparkListener(listener)
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
+ val ex = intercept[SparkException] {
+ sparkContext.range(0, 10000L, numSlices =
slices).mapPartitions { x =>
--- End diff --
If not set then the default is the number of cores on the local machine
which makes the test hardware dependent. It's better to define it for test
reproducibility. On the other hand the variable declaration is not required so
removed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]