Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19239#discussion_r139259901
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
---
@@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
)
}
+ test("watermark with 2 streams") {
+ val first = MemoryStream[Int]
+
+ val firstDf = first.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .select('value)
+
+ val second = MemoryStream[Int]
+
+ val secondDf = second.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "5 seconds")
+ .select('value)
+
+ val union = firstDf.union(secondDf)
+ .writeStream
+ .format("memory")
+ .queryName("test")
+ .start()
+
+ def getWatermarkAfterData(
+ firstData: Seq[Int] = Seq.empty,
+ secondData: Seq[Int] = Seq.empty): Long = {
+ if (firstData.nonEmpty) first.addData(firstData)
+ if (secondData.nonEmpty) second.addData(secondData)
+ union.processAllAvailable()
+ // add a dummy batch so lastExecution has the new watermark
+ first.addData(0)
+ union.processAllAvailable()
+ // get last watermark
+ val lastExecution =
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+ lastExecution.offsetSeqMetadata.batchWatermarkMs
+ }
+
+ // Global watermark starts at 0 until we get data from both sides
+ assert(getWatermarkAfterData(firstData = Seq(11)) == 0)
+ assert(getWatermarkAfterData(secondData = Seq(6)) == 1000)
+ // Global watermark stays at left watermark 1 when right watermark
moves to 2
+ assert(getWatermarkAfterData(secondData = Seq(8)) == 1000)
+ // Global watermark switches to right side value 2 when left watermark
goes higher
+ assert(getWatermarkAfterData(firstData = Seq(21)) == 3000)
+ // Global watermark goes back to left
+ assert(getWatermarkAfterData(secondData = Seq(17, 28, 39)) == 11000)
+ // Global watermark stays on left as long as it's below right
+ assert(getWatermarkAfterData(firstData = Seq(31)) == 21000)
+ assert(getWatermarkAfterData(firstData = Seq(41)) == 31000)
+ // Global watermark switches back to right again
+ assert(getWatermarkAfterData(firstData = Seq(51)) == 34000)
+
+ // Global watermark is updated correctly with simultaneous data from
both sides
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData =
Seq(100)) == 90000)
+ assert(getWatermarkAfterData(firstData = Seq(120), secondData =
Seq(110)) == 105000)
+ assert(getWatermarkAfterData(firstData = Seq(130), secondData =
Seq(125)) == 120000)
+
+ // Global watermark doesn't decrement with simultaneous data
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData =
Seq(100)) == 120000)
+ assert(getWatermarkAfterData(firstData = Seq(140), secondData =
Seq(100)) == 120000)
+ assert(getWatermarkAfterData(firstData = Seq(100), secondData =
Seq(135)) == 130000)
--- End diff --
test recovery of the minimum after a restart.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]