HeartSaVioR commented on code in PR #44688: URL: https://github.com/apache/spark/pull/44688#discussion_r1452024043
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -1097,17 +1097,41 @@ case class StreamingDeduplicateWithinWatermarkExec( protected val extraOptionOnStateStore: Map[String, String] = Map.empty - private val eventTimeCol: Attribute = WatermarkSupport.findEventTimeColumn(child.output, - allowMultipleEventTimeColumns = false).get - private val delayThresholdMs = eventTimeCol.metadata.getLong(EventTimeWatermark.delayKey) - private val eventTimeColOrdinal: Int = child.output.indexOf(eventTimeCol) + // Below three variables are Option as attributes in child won't have an event time column + // in the canonicalized plan. The executed plan will eventually call doExecute() and we can + // defer assertion of the existence of event time column at that time. + private val eventTimeColOpt: Option[Attribute] = WatermarkSupport.findEventTimeColumn( + child.output, allowMultipleEventTimeColumns = false) + private val delayThresholdMsOpt: Option[Long] = eventTimeColOpt.map( + _.metadata.getLong(EventTimeWatermark.delayKey)) + private val eventTimeColOrdinalOpt: Option[Int] = eventTimeColOpt.map(child.output.indexOf) + + // Below three variables will be set lazily when doExecute() is called. Review Comment: We actually leverage trait already for watermark. See WatermarkSupport. I don't see a way to enforce this for operator-specific changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
