Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19239#discussion_r139028613 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala --- @@ -300,6 +300,67 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche ) } + test("watermark with 2 streams") { + val first = MemoryStream[Int] + + val firstAggregation = first.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + + val second = MemoryStream[Int] + + val secondAggregation = second.toDF() + .withColumn("eventTime", $"value".cast("timestamp")) + .withWatermark("eventTime", "10 seconds") + .select('value) + + val union = firstAggregation.union(secondAggregation) + .writeStream + .format("memory") + .queryName("test") + .start() + + def populateNewWatermarkFromData(stream: MemoryStream[Int], data: Int*): Unit = { + stream.addData(data) + union.processAllAvailable() + // add a dummy batch so lastExecution has the new watermark + stream.addData(0) + union.processAllAvailable() + } + + def assertQueryWatermark(watermark: Int): Unit = { + assert(union.asInstanceOf[StreamingQueryWrapper].streamingQuery + .lastExecution.offsetSeqMetadata.batchWatermarkMs + == watermark) + } + + populateNewWatermarkFromData(first, 11) --- End diff -- The problem is that watermark recalculation happens at the beginning of each batch, and to sequence executions I have to call CheckData or CheckLastBatch. So that method ends up producing a test multiple times longer, since a single entry is: AddData(realData) CheckLastBatch AddData(0) CheckLastBatch AssertOnQuery
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org