Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/21214#discussion_r185834096
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---
@@ -153,23 +153,17 @@ class DataFrameRangeSuite extends QueryTest with
SharedSQLContext with Eventuall
test("Cancelling stage in a query with Range.") {
val listener = new SparkListener {
- override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
- eventually(timeout(10.seconds), interval(1.millis)) {
- assert(DataFrameRangeSuite.stageToKill > 0)
- }
- sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ sparkContext.cancelStage(taskStart.stageId)
}
}
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key ->
codegen.toString()) {
- DataFrameRangeSuite.stageToKill = -1
val ex = intercept[SparkException] {
- spark.range(0, 100000000000L, 1, 1).map { x =>
- DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
- x
- }.toDF("id").agg(sum("id")).collect()
+ spark.range(0, 100000000000L, 1, 1)
--- End diff --
You could use `latch.await(timeout)` instead of just `latch.await()`, and
throw an exception if it times out. That would avoid the test blocking
indefinitely, and would add an explicit wait instead of using a large count to
emulate it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]