Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r141982780
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
---
@@ -233,16 +234,54 @@ object UnsupportedOperationChecker {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftOuter | LeftSemi | LeftAnti =>
+ case LeftSemi | LeftAnti =>
if (right.isStreaming) {
- throwError("Left outer/semi/anti joins with a streaming
DataFrame/Dataset " +
- "on the right is not supported")
+ throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ "on the right are not supported")
}
- case RightOuter =>
- if (left.isStreaming) {
- throwError("Right outer join with a streaming
DataFrame/Dataset on the left is " +
- "not supported")
+ // We support left and right outer streaming joins only in the
stream+stream case.
+ case LeftOuter | RightOuter =>
+ if (joinType == LeftOuter && !left.isStreaming &&
right.isStreaming) {
+ throwError("Left outer join with a streaming
DataFrame/Dataset " +
+ "on the right and non-streaming on the left is not
supported")
+ }
+ if (joinType == RightOuter && left.isStreaming &&
!right.isStreaming) {
+ throwError("Right outer join with a streaming
DataFrame/Dataset on the left and " +
+ "non-streaming on the right not supported")
+ }
+
+ if (left.isStreaming && right.isStreaming) {
+ val watermarkInJoinKeys = subPlan match {
+ case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _,
_) =>
+ val keySet = AttributeSet(leftKeys ++ rightKeys)
+ (leftKeys ++ rightKeys).exists {
+ case a: AttributeReference =>
a.metadata.contains(EventTimeWatermark.delayKey)
+ case _ => false
+ }
+ case _ => false
+ }
+
+ // Check if the nullable side has a watermark, and there's
a range condition which
+ // implies a state value watermark on the first side.
+ val hasValidWatermarkRange = joinType match {
+ case LeftOuter =>
+ // We provide a dummy watermark value 0 - we just want
to check if the
+ // watermark predicate can be constructed.
+ StreamingJoinHelper.getStateValueWatermark(
--- End diff --
offline discussion. its not worth the change. just add the test to ensure
that this function does not convert the result `Some(negative)` to `None` (may
happen when `eventTimeWatermark` is set to 0).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]