Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19271#discussion_r139895235
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -179,6 +167,31 @@ trait WatermarkSupport extends UnaryExecNode {
       }
     }
     
    +object WatermarkSupport {
    +
    +  /** Generate an expression on given attributes that matches data older 
than the watermark */
    +  def watermarkExpression(
    +      optionalWatermarkExpression: Option[Expression],
    +      optionalWatermarkMs: Option[Long]): Option[Expression] = {
    +    if (optionalWatermarkExpression.isEmpty || 
optionalWatermarkMs.isEmpty) return None
    +
    +    val watermarkAttribute = optionalWatermarkExpression.get
    +    // If we are evicting based on a window, use the end of the window.  
Otherwise just
    +    // use the attribute itself.
    +    val evictionExpression =
    +      if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
    --- End diff --
    
    Is this for the output of `window`? It seems very fragile at the moment. 
Can we add a special metadata that specifies that it would in fact be the 
output of a `window` expression? That would allow us in the future to check 
that people actually perform windowed joins instead of arbitrary joins, etc.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to