Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21385#discussion_r190684685
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleReadSuite.scala
---
@@ -160,25 +170,122 @@ class ContinuousShuffleReadSuite extends StreamTest {
}
test("blocks waiting for new rows") {
- val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1)
+ val rdd = new ContinuousShuffleReadRDD(
+ sparkContext, numPartitions = 1, checkpointIntervalMs =
Long.MaxValue)
+ val epoch = rdd.compute(rdd.partitions(0), ctx)
val readRowThread = new Thread {
override def run(): Unit = {
- // set the non-inheritable thread local
- TaskContext.setTaskContext(ctx)
- val epoch = rdd.compute(rdd.partitions(0), ctx)
- epoch.next().getInt(0)
+ try {
+ epoch.next().getInt(0)
+ } catch {
+ case _: InterruptedException => // do nothing - expected at test
ending
+ }
}
}
try {
readRowThread.start()
eventually(timeout(streamingTimeout)) {
- assert(readRowThread.getState == Thread.State.WAITING)
+ assert(readRowThread.getState == Thread.State.TIMED_WAITING)
--- End diff --
It changed because we added a timeout. The JVM always puts threads waiting
without a timeout into WAITING, and threads waiting with a timeout into
TIMED_WAITING.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]