anishshri-db commented on code in PR #44688:
URL: https://github.com/apache/spark/pull/44688#discussion_r1452016953


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec(
 
   protected val extraOptionOnStateStore: Map[String, String] = Map.empty
 
-  private val eventTimeCol: Attribute = 
WatermarkSupport.findEventTimeColumn(child.output,
-    allowMultipleEventTimeColumns = false).get
-  private val delayThresholdMs = 
eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey)
-  private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol)
+  // Below three variables are Option as attributes in child won't have an 
event time column
+  // in the canonicalized plan. The executed plan will eventually call 
doExecute() and we can
+  // defer assertion of the existence of event time column at that time.
+  private val eventTimeColOpt: Option[Attribute] = 
WatermarkSupport.findEventTimeColumn(
+    child.output, allowMultipleEventTimeColumns = false)
+  private val delayThresholdMsOpt: Option[Long] = eventTimeColOpt.map(
+    _.metadata.getLong(EventTimeWatermark.delayKey))
+  private val eventTimeColOrdinalOpt: Option[Int] = 
eventTimeColOpt.map(child.output.indexOf)
+
+  // Below three variables will be set lazily when doExecute() is called.

Review Comment:
   I guess this is only possible for this operator. But do you think there is a 
way to enforce this at an operator level so that future operators won't miss 
this case ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to