Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/12008#discussion_r57641524
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -1585,3 +1586,110 @@ object ResolveUpCast extends Rule[LogicalPlan] {
}
}
}
+
+/**
+ * Maps a time column to multiple time windows using the Expand operator.
Since it's non-trivial to
+ * figure out how many windows a time column can map to, we over-estimate
the number of windows and
+ * filter out the rows where the time column is not inside the time window.
+ */
+object TimeWindowing extends Rule[LogicalPlan] {
+
+ /**
+ * Depending on the operation, the TimeWindow expression may be wrapped
in an Alias (in case of
+ * projections) or be simply by itself (in case of groupBy),
+ * @param f The function that we want to apply on the TimeWindow
expression
+ * @return The user defined function applied on the TimeWindow expression
+ */
+ private def getWindowExpr[E](f: TimeWindow => E):
PartialFunction[Expression, E] = {
+ case Alias(windowExpr: TimeWindow, _) => f(windowExpr)
+ case windowExpr: TimeWindow => f(windowExpr)
+ }
+
+ /** Pass on other columns that are required either in a projection or
grouping. */
+ private def getMissingAttributes(plan: LogicalPlan):
Seq[NamedExpression] = {
+ val expressions = plan match {
+ case p: Project => p.projectList
+ case a: Aggregate => a.groupingExpressions
+ case _ => Seq.empty
+ }
+ expressions.flatMap(_.references)
+ }
+
+ /**
+ * Generates the logical plan for generating window ranges on a
timestamp column. Without
+ * knowing what the timestamp value is, it's non-trivial to figure out
deterministically how many
+ * window ranges a timestamp will map to given all possible combinations
of a window duration,
+ * slide duration and start time (offset). Therefore, we express and
over-estimate the number of
+ * windows there may be, and filter the valid windows. We use last
Project operator to group
+ * the window columns into a struct so they can be accessed as
`window.start` and `window.end`.
+ *
+ * The windows are calculated as below:
+ * maxNumOverlapping <- ceil(windowDuration / slideDuration)
+ * for (i <- 0 until maxNumOverlapping)
+ * windowId <- ceil((timestamp - startTime) / slideDuration)
+ * windowStart <- windowId * slideDuration + (i - maxNumOverlapping) *
slideDuration + startTime
+ * windowEnd <- windowStart + windowDuration
+ * return windowStart, windowEnd
+ *
+ * This behaves as follows for the given parameters for the time: 12:05.
The valid windows are
+ * marked with a +, and invalid ones are marked with a x. The invalid
ones are filtered using the
+ * Filter operator.
+ * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
+ * 11:55 - 12:07 + 11:52 - 12:04 x
+ * 12:00 - 12:12 + 11:57 - 12:09 +
+ * 12:05 - 12:17 + 12:02 - 12:14 +
+ *
+ * @param p The logical plan
+ * @return the logical plan that will generate the time windows using
the Expand operator, with
+ * the Filter operator for correctness and Project for usability.
+ */
+ private def generateWindows(p: LogicalPlan): LogicalPlan = {
+ val windowExpr = p.expressions.collect(getWindowExpr[TimeWindow](e =>
e)).head
+ // get all expressions we need to pass on for projections
+ val otherExpressions =
+
getMissingAttributes(p).filterNot(_.children.exists(_.isInstanceOf[TimeWindow]))
+
+ val projections = Seq.tabulate(windowExpr.maxNumOverlapping + 1) { i =>
+ // windowId <- ceil((timestamp - startTime) / slideDuration)
+ val division = Ceil(Divide(Subtract(Cast(windowExpr.timeColumn,
LongType),
+ Literal(windowExpr.startTime)), Literal(windowExpr.slideDuration)))
+ // start <- (windowId + i - maxNumOverlapping) * slideDuration +
startTime
+ // the 1000000 is necessary for properly casting a LongType to a
TimestampType
+ val windowStart =
+ Multiply(
+ Add(
+ Multiply(
+ Add(division, Literal(i - windowExpr.maxNumOverlapping)),
+ Literal(windowExpr.slideDuration)),
+ Literal(windowExpr.startTime)),
+ Literal(1000000))
+ // windowEnd <- windowStart + windowDuration
+ val windowEnd =
+ Add(windowStart, Multiply(Literal(windowExpr.windowDuration),
Literal(1000000)))
+ windowStart :: windowEnd :: windowExpr.originalTimeColumn :: Nil ++
otherExpressions
+ }
+ val timeCol = windowExpr.originalTimeColumn.references.toSeq
+ // timestamp >= window.start && timestamp < window.end
+ val filterExpr = And(GreaterThanOrEqual(windowExpr.timeColumn,
windowExpr.windowStartCol),
+ LessThan(windowExpr.timeColumn, windowExpr.windowEndCol))
+ Project(windowExpr.output ++ Seq(windowExpr.outputColumn) ++ timeCol
++ otherExpressions,
+ Filter(filterExpr,
+ Expand(projections, windowExpr.output ++ timeCol ++
otherExpressions.map(_.toAttribute),
+ p.children.head)))
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+ case p: LogicalPlan if
p.expressions.collect(getWindowExpr[TimeWindow](e => e)).nonEmpty &&
+ p.children.length == 1 =>
+ val windowed = generateWindows(p)
+ val rewritten = p transformExpressions getWindowExpr { windowExpr =>
+ windowExpr.validate() match {
+ case Some(e) => throw new AnalysisException(e)
--- End diff --
I'm not sure if it's because we do a substitution, but I couldn't get
`checkInputDataTypes` called on this expression, therefore I kept this for now.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]