Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r141934196
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
---
@@ -233,16 +234,54 @@ object UnsupportedOperationChecker {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftOuter | LeftSemi | LeftAnti =>
+ case LeftSemi | LeftAnti =>
if (right.isStreaming) {
- throwError("Left outer/semi/anti joins with a streaming
DataFrame/Dataset " +
- "on the right is not supported")
+ throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ "on the right are not supported")
}
- case RightOuter =>
- if (left.isStreaming) {
- throwError("Right outer join with a streaming
DataFrame/Dataset on the left is " +
- "not supported")
+ // We support left and right outer streaming joins only in the
stream+stream case.
+ case LeftOuter | RightOuter =>
+ if (joinType == LeftOuter && !left.isStreaming &&
right.isStreaming) {
+ throwError("Left outer join with a streaming
DataFrame/Dataset " +
+ "on the right and non-streaming on the left is not
supported")
+ }
+ if (joinType == RightOuter && left.isStreaming &&
!right.isStreaming) {
+ throwError("Right outer join with a streaming
DataFrame/Dataset on the left and " +
+ "non-streaming on the right not supported")
+ }
+
+ if (left.isStreaming && right.isStreaming) {
+ val watermarkInJoinKeys = subPlan match {
+ case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _,
_) =>
+ val keySet = AttributeSet(leftKeys ++ rightKeys)
+ (leftKeys ++ rightKeys).exists {
+ case a: AttributeReference =>
a.metadata.contains(EventTimeWatermark.delayKey)
+ case _ => false
+ }
+ case _ => false
+ }
+
+ // Check if the nullable side has a watermark, and there's
a range condition which
+ // implies a state value watermark on the first side.
+ val hasValidWatermarkRange = joinType match {
+ case LeftOuter =>
+ // We provide a dummy watermark value 0 - we just want
to check if the
+ // watermark predicate can be constructed.
+ StreamingJoinHelper.getStateValueWatermark(
--- End diff --
this needs to be refactored. the function needs to be refactored such that
it returns an optional expression to calculate state watermark. it will return
an expression only if there is one or more time range condition of the right
type.
this refactoring shouldnt be too hard. in the helper method, instead of
replacing `attributeWithWatermark` with the value of `eventTimeWatermark` and
evaluating the equation, it should just create the expression
`attributeWithWatermark + C`. This is the expression to calculate
stateWatermark. Note that there can be multiple conditions with the necessary
directionality, so it can be a list of expressions `attributeWithWatermark +
C1`, `attributeWithWatermark + C2`, `...C3`, etc.
In the unsupported operation checker, it should just check if such
expressions have been extracted or not. And in the planner, a separate method
should take these expression and evaluate them.
And a different function, can take these conditions, and split them up
again, and evaluate
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]