HeartSaVioR commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452024043


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+  // Below three variables are Option as attributes in child won't have an 
event time column
+  // in the canonicalized plan. The executed plan will eventually call 
doExecute() and we can
+  // defer assertion of the existence of event time column at that time.
+  private val eventTimeColOpt: Option[Attribute] = 
WatermarkSupport.findEventTimeColumn(
+    child.output, allowMultipleEventTimeColumns = false)
+  private val delayThresholdMsOpt: Option[Long] = eventTimeColOpt.map(
+    _.metadata.getLong(EventTimeWatermark.delayKey))
+  private val eventTimeColOrdinalOpt: Option[Int] = 
eventTimeColOpt.map(child.output.indexOf)
+
+  // Below three variables will be set lazily when doExecute() is called.

Review Comment:
   We actually leverage trait already for watermark. See WatermarkSupport. I 
don't see a way to enforce this for operator-specific changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to