Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/21428#discussion_r191596882
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
---
@@ -40,22 +60,129 @@ class ContinuousShuffleReadSuite extends StreamTest {
messages.foreach(endpoint.askSync[Unit](_))
}
- // In this unit test, we emulate that we're in the task thread where
- // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a
task context
- // thread local to be set.
- var ctx: TaskContextImpl = _
+ private def readRDDEndpoint(rdd: ContinuousShuffleReadRDD) = {
+ rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
+ }
- override def beforeEach(): Unit = {
- super.beforeEach()
- ctx = TaskContext.empty()
- TaskContext.setTaskContext(ctx)
+ private def readEpoch(rdd: ContinuousShuffleReadRDD) = {
+ rdd.compute(rdd.partitions(0), ctx).toSeq.map(_.getInt(0))
}
- override def afterEach(): Unit = {
- ctx.markTaskCompleted(None)
- TaskContext.unset()
- ctx = null
- super.afterEach()
+ test("one epoch") {
--- End diff --
Reordered.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]