Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19239#discussion_r139026379
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
)
}
+ test("watermark with 2 streams") {
+ val first = MemoryStream[Int]
+
+ val firstAggregation = first.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .select('value)
+
+ val second = MemoryStream[Int]
+
+ val secondAggregation = second.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .select('value)
+
+ val union = firstAggregation.union(secondAggregation)
+ .writeStream
+ .format("memory")
+ .queryName("test")
+ .start()
+
+ def populateNewWatermarkFromData(stream: MemoryStream[Int], data:
Int*): Unit = {
+ stream.addData(data)
+ union.processAllAvailable()
+ // add a dummy batch so lastExecution has the new watermark
+ stream.addData(0)
+ union.processAllAvailable()
+ }
+
+ def assertQueryWatermark(watermark: Int): Unit = {
+ assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery
+ .lastExecution.offsetSeqMetadata.batchWatermarkMs
+ == watermark)
+ }
+
+ populateNewWatermarkFromData(first, 11)
--- End diff --
Also, I think you can use the `testStream..AddData... AssertOnQuery`
pattern. its cleaner.
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala#L180
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]