sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1589359837
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala:
##########
@@ -49,26 +80,31 @@ case class EventTimeWatermark(
// logic here because we also maintain the compatibility flag. (See
// SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE for details.)
// TODO: Disallow updating the metadata once we remove the compatibility
flag.
- override val output: Seq[Attribute] = child.output.map { a =>
- if (a semanticEquals eventTime) {
- val delayMs = EventTimeWatermark.getDelayMs(delay)
- val updatedMetadata = new MetadataBuilder()
- .withMetadata(a.metadata)
- .putLong(EventTimeWatermark.delayKey, delayMs)
- .build()
- a.withMetadata(updatedMetadata)
- } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
- // Remove existing watermark
- val updatedMetadata = new MetadataBuilder()
- .withMetadata(a.metadata)
- .remove(EventTimeWatermark.delayKey)
- .build()
- a.withMetadata(updatedMetadata)
- } else {
- a
- }
+ override val output: Seq[Attribute] = {
+ val delayMs = EventTimeWatermark.getDelayMs(delay)
+ updateEventTimeColumn(child.output, delayMs, eventTime)
}
override protected def withNewChildInternal(newChild: LogicalPlan):
EventTimeWatermark =
copy(child = newChild)
}
+
+/**
+ * Updates the event time column to [[eventTime]] in the child output.
+ *
+ * Any watermark calculations performed after this node will use the
+ * updated eventTimeColumn.
+ */
+case class UpdateEventTimeWatermarkColumn(
+ eventTime: Attribute,
+ delay: CalendarInterval,
+ child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = {
+ val delayMs = EventTimeWatermark.getDelayMs(delay)
+ updateEventTimeColumn(child.output, delayMs, eventTime)
+}
Review Comment:
Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]