ninebigbig commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1103501006
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -324,6 +344,33 @@ object WatermarkSupport {
}
Some(evictionExpression)
}
+
+ def findEventTimeColumn(
+ attrs: Seq[Attribute],
+ useFirstOccurrence: Boolean): Option[Attribute] = {
+ val eventTimeCols =
attrs.filter(_.metadata.contains(EventTimeWatermark.delayKey))
+ if (!useFirstOccurrence) {
+ // There is a case projection leads the same column (same exprId) to
appear more than one
+ // time. Allowing them does not hurt the correctness of state row
eviction, hence let's start
+ // with allowing them.
+ val eventTimeColsSet = eventTimeCols.map(_.exprId).toSet
+ if (eventTimeColsSet.size > 1) {
+ throw new AnalysisException("More than one event time columns are
available. Please " +
+ "ensure there is at most one event time column per stream. event
time columns: " +
+ eventTimeCols.mkString("(", ",", ")"))
+ }
+
+ // With above check, even there are multiple columns in eventTimeCols,
all columns must be
+ // same.
Review Comment:
typo: must be the same?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]