Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21428#discussion_r191596882
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala
 ---
    @@ -40,22 +60,129 @@ class ContinuousShuffleReadSuite extends StreamTest {
         messages.foreach(endpoint.askSync[Unit](_))
       }
     
    -  // In this unit test, we emulate that we're in the task thread where
    -  // ContinuousShuffleReadRDD.compute() will be evaluated. This requires a 
task context
    -  // thread local to be set.
    -  var ctx: TaskContextImpl = _
    +  private def readRDDEndpoint(rdd: ContinuousShuffleReadRDD) = {
    +    rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint
    +  }
     
    -  override def beforeEach(): Unit = {
    -    super.beforeEach()
    -    ctx = TaskContext.empty()
    -    TaskContext.setTaskContext(ctx)
    +  private def readEpoch(rdd: ContinuousShuffleReadRDD) = {
    +    rdd.compute(rdd.partitions(0), ctx).toSeq.map(_.getInt(0))
       }
     
    -  override def afterEach(): Unit = {
    -    ctx.markTaskCompleted(None)
    -    TaskContext.unset()
    -    ctx = null
    -    super.afterEach()
    +  test("one epoch") {
    --- End diff --
    
    Reordered.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to