Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r140933568
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
---
@@ -233,16 +234,46 @@ object UnsupportedOperationChecker {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftOuter | LeftSemi | LeftAnti =>
+ case LeftSemi | LeftAnti =>
if (right.isStreaming) {
- throwError("Left outer/semi/anti joins with a streaming
DataFrame/Dataset " +
- "on the right is not supported")
+ throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ "on the right are not supported")
}
- case RightOuter =>
- if (left.isStreaming) {
- throwError("Right outer join with a streaming
DataFrame/Dataset on the left is " +
- "not supported")
+ // We support left and right outer streaming joins only in the
stream+stream case.
+ case LeftOuter | RightOuter =>
+ if (joinType == LeftOuter && !left.isStreaming &&
right.isStreaming) {
+ throwError("Left outer join with a streaming
DataFrame/Dataset " +
+ "on the right and non-streaming on the left is not
supported")
+ }
+ if (joinType == RightOuter && left.isStreaming &&
!right.isStreaming) {
+ throwError("Right outer join with a streaming
DataFrame/Dataset on the left and " +
+ "non-streaming on the right not supported")
+ }
+
+ if (left.isStreaming && right.isStreaming) {
+ val watermarkInJoinKeys = subPlan match {
+ case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _,
_) =>
+ val keySet = AttributeSet(leftKeys ++ rightKeys)
+ (leftKeys ++ rightKeys).exists {
+ case a: AttributeReference =>
a.metadata.contains(EventTimeWatermark.delayKey)
+ case _ => false
+ }
+ case _ => false
+ }
+
+ val oppositeSideHasWatermark = joinType match {
+ case LeftOuter =>
+
right.output.find(_.metadata.contains(EventTimeWatermark.delayKey)).isDefined
+ case RightOuter =>
--- End diff --
This condition is not sufficient. state can be dropped from left side (in
left outer join) only when both conditions are satisfied
1. right.output has watermark
2. join condition has a time range condition such that left side state
value watermark can be defined from the right side event time watermark.
Now `StreamingSymmetricHashJoinHelper.getOneSideStateWatermarkPredicate`
checks for (2).
So you have to invoke that to find out whether the conditions are
sufficient for defining state value watermarks
However, that is in the SQL project, not catalyst. So you may have to move
that helper class into catalyst, and then use it from the
UnsupportedOperationChecker
(also have to rename that helper class accordingly what makes sense)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]