Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12008#discussion_r57617913
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -1585,3 +1586,110 @@ object ResolveUpCast extends Rule[LogicalPlan] {
         }
       }
     }
    +
    +/**
    + * Maps a time column to multiple time windows using the Expand operator. 
Since it's non-trivial to
    + * figure out how many windows a time column can map to, we over-estimate 
the number of windows and
    + * filter out the rows where the time column is not inside the time window.
    + */
    +object TimeWindowing extends Rule[LogicalPlan] {
    +
    +  /**
    +   * Depending on the operation, the TimeWindow expression may be wrapped 
in an Alias (in case of
    +   * projections) or be simply by itself (in case of groupBy),
    +   * @param f The function that we want to apply on the TimeWindow 
expression
    +   * @return The user defined function applied on the TimeWindow expression
    +   */
    +  private def getWindowExpr[E](f: TimeWindow => E): 
PartialFunction[Expression, E] = {
    +    case Alias(windowExpr: TimeWindow, _) => f(windowExpr)
    +    case windowExpr: TimeWindow => f(windowExpr)
    +  }
    +
    +  /** Pass on other columns that are required either in a projection or 
grouping. */
    +  private def getMissingAttributes(plan: LogicalPlan): 
Seq[NamedExpression] = {
    +    val expressions = plan match {
    +      case p: Project => p.projectList
    +      case a: Aggregate => a.groupingExpressions
    +      case _ => Seq.empty
    +    }
    +    expressions.flatMap(_.references)
    +  }
    +
    +  /**
    +   * Generates the logical plan for generating window ranges on a 
timestamp column. Without
    +   * knowing what the timestamp value is, it's non-trivial to figure out 
deterministically how many
    +   * window ranges a timestamp will map to given all possible combinations 
of a window duration,
    +   * slide duration and start time (offset). Therefore, we express and 
over-estimate the number of
    +   * windows there may be, and filter the valid windows. We use last 
Project operator to group
    +   * the window columns into a struct so they can be accessed as 
`window.start` and `window.end`.
    +   *
    +   * The windows are calculated as below:
    +   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
    +   * for (i <- 0 until maxNumOverlapping)
    +   *   windowId <- ceil((timestamp - startTime) / slideDuration)
    +   *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * 
slideDuration + startTime
    +   *   windowEnd <- windowStart + windowDuration
    +   *   return windowStart, windowEnd
    +   *
    +   * This behaves as follows for the given parameters for the time: 12:05. 
The valid windows are
    +   * marked with a +, and invalid ones are marked with a x. The invalid 
ones are filtered using the
    +   * Filter operator.
    +   * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
    +   *     11:55 - 12:07 +                      11:52 - 12:04 x
    +   *     12:00 - 12:12 +                      11:57 - 12:09 +
    +   *     12:05 - 12:17 +                      12:02 - 12:14 +
    +   *
    +   * @param p The logical plan
    +   * @return the logical plan that will generate the time windows using 
the Expand operator, with
    +   *         the Filter operator for correctness and Project for usability.
    +   */
    +  private def generateWindows(p: LogicalPlan): LogicalPlan = {
    +    val windowExpr = p.expressions.collect(getWindowExpr[TimeWindow](e => 
e)).head
    +    // get all expressions we need to pass on for projections
    +    val otherExpressions =
    +      
getMissingAttributes(p).filterNot(_.children.exists(_.isInstanceOf[TimeWindow]))
    +
    +    val projections = Seq.tabulate(windowExpr.maxNumOverlapping + 1) { i =>
    +      // windowId <- ceil((timestamp - startTime) / slideDuration)
    +      val division = Ceil(Divide(Subtract(Cast(windowExpr.timeColumn, 
LongType),
    +        Literal(windowExpr.startTime)), Literal(windowExpr.slideDuration)))
    +      // start <- (windowId + i - maxNumOverlapping) * slideDuration + 
startTime
    +      // the 1000000 is necessary for properly casting a LongType to a 
TimestampType
    +      val windowStart =
    +        Multiply(
    +          Add(
    +            Multiply(
    +              Add(division, Literal(i - windowExpr.maxNumOverlapping)),
    +              Literal(windowExpr.slideDuration)),
    +            Literal(windowExpr.startTime)),
    +          Literal(1000000))
    --- End diff --
    
    This might be easier to read if it was written with the dsl.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to