anishshri-db commented on code in PR #53930:
URL: https://github.com/apache/spark/pull/53930#discussion_r2724983161
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala:
##########
@@ -684,6 +684,42 @@ object WatermarkSupport {
// pick the first element if exists
eventTimeCols.headOption
}
+
+ /**
+ * Find the index of the column which is marked as "event time" column.
+ *
+ * If there are multiple event time columns in given column list, the
behavior depends on the
+ * parameter `allowMultipleEventTimeColumns`. If it's set to true, the first
occurred column will
+ * be returned. If not, this method will throw an AnalysisException as it is
not allowed to have
+ * multiple event time columns.
+ */
+ def findEventTimeColumnIndex(
+ attrs: Seq[Attribute],
+ allowMultipleEventTimeColumns: Boolean): Option[Int] = {
+ val eventTimeCols = attrs.zipWithIndex
+ .filter(_._1.metadata.contains(EventTimeWatermark.delayKey))
+ if (!allowMultipleEventTimeColumns) {
+ // There is a case projection leads the same column (same exprId) to
appear more than one
+ // time. Allowing them does not hurt the correctness of state row
eviction, hence let's start
+ // with allowing them.
+ val eventTimeColsSet = eventTimeCols.map(_._1.exprId).toSet
+ if (eventTimeColsSet.size > 1) {
+ throw new AnalysisException(
+ errorClass = "_LEGACY_ERROR_TEMP_3077",
Review Comment:
Can we add new error class for this ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]