HeartSaVioR commented on code in PR #38288:
URL: https://github.com/apache/spark/pull/38288#discussion_r1000120218
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -4201,6 +4219,73 @@ object SessionWindowing extends Rule[LogicalPlan] {
}
}
+/**
+ * Resolves the window_time expression which extracts the correct window time
from the
+ * window column generated as the output of the window aggregating operators.
The
+ * window column is of type struct { start: TimestampType, end: TimestampType
}.
+ * The correct window time for further aggregations is window.end - 1.
+ * */
+object ResolveWindowTime extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+ case p: LogicalPlan if p.children.size == 1 =>
+ val child = p.children.head
+ val windowTimeExpressions =
+ p.expressions.flatMap(_.collect { case w: WindowTime => w }).toSet
+
+ if (windowTimeExpressions.size == 1 &&
Review Comment:
Alex and I have been discussed about the complicated corner cases when we
enable this... This should be totally feasible in point of SQL's view, but
tricky in the context of streaming.
Btw I just figured out the simplest rationalization to allow the only one.
We reserve the output column name for the function (as "window_time"), as same
as we do for window()/session_window() function. Otherwise we will have to
assign the resulting column like `window_time(window)`. So that seems to be a
matter of whether we want to do the favor of window-family functions be special
or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]