Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19239#discussion_r139077656
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
---
@@ -300,6 +300,85 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
)
}
+ test("watermark with 2 streams") {
+ val first = MemoryStream[Int]
+
+ val firstDf = first.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .select('value)
+
+ val second = MemoryStream[Int]
+
+ val secondDf = second.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "5 seconds")
+ .select('value)
+
+ val union = firstDf.union(secondDf)
+ .writeStream
+ .format("memory")
+ .queryName("test")
+ .start()
+
+ def generateAndAssertNewWatermark(
+ stream: MemoryStream[Int],
+ data: Seq[Int],
+ watermark: Int): Unit = {
+ stream.addData(data)
+ assertWatermark(watermark)
+ }
+
+ def assertWatermark(watermark: Int) {
+ union.processAllAvailable()
+ // add a dummy batch so lastExecution has the new watermark
+ first.addData(0)
+ union.processAllAvailable()
+
+ val lastExecution =
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
+ assert(lastExecution.offsetSeqMetadata.batchWatermarkMs == watermark)
+ }
+
+ generateAndAssertNewWatermark(first, Seq(11), 1000)
--- End diff --
instead of having two variations for handing single input and double
inputs, you can do something like this.
```
def getWatermarkAfterData(firstData: Seq[Int] = Seq.empty, secondData:
Seq[Int] = Seq.empty): Long = {
if (firstData.nonEmpty) first.add(firstData)
if (secondData.nonEmpty) second.add(secondData)
union.processAllAvailable()
// add a dummy batch so lastExecution has the new watermark
first.addData(0)
union.processAllAvailable()
// get updated watermark
val lastExecution =
union.asInstanceOf[StreamingQueryWrapper].streamingQuery.lastExecution
lastExecution.offsetSeqMetadata.batchWatermarkMs
}
assert(getWatermarkAfterData(firstData = Seq(...) === 10000)
assert(getWatermarkAfterData(secondData = Seq(...) === 10000)
assert(getWatermarkAfterData(firstData = Seq(...), secondData = Seq(...)
=== 10000)
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]