Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19239#discussion_r139070352
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -560,13 +567,25 @@ class StreamExecution(
}
if (hasNewData) {
var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
- // Update the eventTime watermark if we find one in the plan.
+ // Update the eventTime watermarks if we find any in the plan.
if (lastExecution != null) {
lastExecution.executedPlan.collect {
- case e: EventTimeWatermarkExec if e.eventTimeStats.value.count >
0 =>
+ case e: EventTimeWatermarkExec => e
+ }.zipWithIndex.foreach {
+ case (e, index) if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats:
${e.eventTimeStats.value}")
- e.eventTimeStats.value.max - e.delayMs
- }.headOption.foreach { newWatermarkMs =>
+ val newAttributeWatermarkMs = e.eventTimeStats.value.max -
e.delayMs
+ val mappedWatermarkMs: Option[Long] = watermarkMsMap.get(index)
--- End diff --
mappedWatermarkMs -> previousWatermarkMs
more semantically meaningful.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]