Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/19271#discussion_r139895235
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
---
@@ -179,6 +167,31 @@ trait WatermarkSupport extends UnaryExecNode {
}
}
+object WatermarkSupport {
+
+ /** Generate an expression on given attributes that matches data older
than the watermark */
+ def watermarkExpression(
+ optionalWatermarkExpression: Option[Expression],
+ optionalWatermarkMs: Option[Long]): Option[Expression] = {
+ if (optionalWatermarkExpression.isEmpty ||
optionalWatermarkMs.isEmpty) return None
+
+ val watermarkAttribute = optionalWatermarkExpression.get
+ // If we are evicting based on a window, use the end of the window.
Otherwise just
+ // use the attribute itself.
+ val evictionExpression =
+ if (watermarkAttribute.dataType.isInstanceOf[StructType]) {
--- End diff --
Is this for the output of `window`? It seems very fragile at the moment.
Can we add a special metadata that specifies that it would in fact be the
output of a `window` expression? That would allow us in the future to check
that people actually perform windowed joins instead of arbitrary joins, etc.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]