Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139895235 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -179,6 +167,31 @@ trait WatermarkSupport extends UnaryExecNode { } } +object WatermarkSupport { + + /** Generate an expression on given attributes that matches data older than the watermark */ + def watermarkExpression( + optionalWatermarkExpression: Option[Expression], + optionalWatermarkMs: Option[Long]): Option[Expression] = { + if (optionalWatermarkExpression.isEmpty || optionalWatermarkMs.isEmpty) return None + + val watermarkAttribute = optionalWatermarkExpression.get + // If we are evicting based on a window, use the end of the window. Otherwise just + // use the attribute itself. + val evictionExpression = + if (watermarkAttribute.dataType.isInstanceOf[StructType]) { --- End diff -- Is this for the output of `window`? It seems very fragile at the moment. Can we add a special metadata that specifies that it would in fact be the output of a `window` expression? That would allow us in the future to check that people actually perform windowed joins instead of arbitrary joins, etc.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org