Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21385#discussion_r190730661 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala --- @@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest { } test("blocks waiting for new rows") { - val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) + val rdd = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, checkpointIntervalMs = Long.MaxValue) + val epoch = rdd.compute(rdd.partitions(0), ctx) val readRowThread = new Thread { override def run(): Unit = { - // set the non-inheritable thread local - TaskContext.setTaskContext(ctx) - val epoch = rdd.compute(rdd.partitions(0), ctx) - epoch.next().getInt(0) + try { + epoch.next().getInt(0) + } catch { + case _: InterruptedException => // do nothing - expected at test ending + } } } try { readRowThread.start() eventually(timeout(streamingTimeout)) { - assert(readRowThread.getState == Thread.State.WAITING) + assert(readRowThread.getState == Thread.State.TIMED_WAITING) --- End diff -- I see. That's good to know!
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org