Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19239#discussion_r139028613
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 ---
    @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
         )
       }
     
    +  test("watermark with 2 streams") {
    +    val first = MemoryStream[Int]
    +
    +    val firstAggregation = first.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "10 seconds")
    +      .select('value)
    +
    +    val second = MemoryStream[Int]
    +
    +    val secondAggregation = second.toDF()
    +      .withColumn("eventTime", $"value".cast("timestamp"))
    +      .withWatermark("eventTime", "10 seconds")
    +      .select('value)
    +
    +    val union = firstAggregation.union(secondAggregation)
    +      .writeStream
    +      .format("memory")
    +      .queryName("test")
    +      .start()
    +
    +    def populateNewWatermarkFromData(stream: MemoryStream[Int], data: 
Int*): Unit = {
    +      stream.addData(data)
    +      union.processAllAvailable()
    +      // add a dummy batch so lastExecution has the new watermark
    +      stream.addData(0)
    +      union.processAllAvailable()
    +    }
    +
    +    def assertQueryWatermark(watermark: Int): Unit = {
    +      assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
    +        .lastExecution.offsetSeqMetadata.batchWatermarkMs
    +        == watermark)
    +    }
    +
    +    populateNewWatermarkFromData(first, 11)
    --- End diff --
    
    The problem is that watermark recalculation happens at the beginning of 
each batch, and to sequence executions I have to call CheckData or 
CheckLastBatch. So that method ends up producing a test multiple times longer, 
since a single entry is:
    
    AddData(realData)
    CheckLastBatch
    AddData(0)
    CheckLastBatch
    AssertOnQuery


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to