Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17525#discussion_r109805306
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 ---
    @@ -207,80 +207,91 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     
         /** Custom MemoryStream that waits for manual clock to reach a time */
         val inputData = new MemoryStream[Int](0, sqlContext) {
    -      // Wait for manual clock to be 100 first time there is data
    +      // getOffset should take 50 ms the first time it is called
           override def getOffset: Option[Offset] = {
             val offset = super.getOffset
             if (offset.nonEmpty) {
    -          clock.waitTillTime(300)
    +          clock.waitTillTime(1050)
             }
             offset
           }
     
    -      // Wait for manual clock to be 300 first time there is data
    +      // getBatch should take 100 ms the first time it is called
           override def getBatch(start: Option[Offset], end: Offset): DataFrame 
= {
    -        clock.waitTillTime(600)
    +        if (start.isEmpty) clock.waitTillTime(1150)
             super.getBatch(start, end)
           }
         }
     
    -    // This is to make sure thatquery waits for manual clock to be 600 
first time there is data
    -    val mapped = inputData.toDS().as[Long].map { x =>
    -      clock.waitTillTime(1100)
    +    // query execution should take 350 ms the first time it is called
    +    val mapped = inputData.toDS.coalesce(1).as[Long].map { x =>
    +      clock.waitTillTime(1500)  // this will only wait the first time when 
clock < 1500
           10 / x
         }.agg(count("*")).as[Long]
     
    -    case class AssertStreamExecThreadToWaitForClock()
    +    case class AssertStreamExecThreadIsWaitingForTime(targetTime: Long)
           extends AssertOnQuery(q => {
             eventually(Timeout(streamingTimeout)) {
               if (q.exception.isEmpty) {
    -            
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
    +            assert(clock.isStreamWaitingFor(targetTime))
               }
             }
             if (q.exception.isDefined) {
               throw q.exception.get
             }
             true
    -      }, "")
    +      }, "") {
    +      override def toString: String = 
s"AssertStreamExecThreadIsWaitingForTime($targetTime)"
    +    }
    +
    +    case class AssertClockTime(time: Long)
    +      extends AssertOnQuery(q => clock.getTimeMillis() === time, "") {
    +      override def toString: String = s"AssertClockTime($time)"
    +    }
     
         var lastProgressBeforeStop: StreamingQueryProgress = null
     
         testStream(mapped, OutputMode.Complete)(
    -      StartStream(ProcessingTime(100), triggerClock = clock),
    -      AssertStreamExecThreadToWaitForClock(),
    +      StartStream(ProcessingTime(1000), triggerClock = clock),
    --- End diff --
    
    This test needed fixing because this manual clock test was configured such 
that first batch takes > 100 ms even though the trigger interval was 100 ms. 
This caused additional batch to be automatically executed without waiting for 
the manual clock to be advance, thus breaking certain assumptions in the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to