Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r142288587
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
---
@@ -233,16 +234,53 @@ object UnsupportedOperationChecker {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftOuter | LeftSemi | LeftAnti =>
+ case LeftSemi | LeftAnti =>
if (right.isStreaming) {
- throwError("Left outer/semi/anti joins with a streaming
DataFrame/Dataset " +
- "on the right is not supported")
+ throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ "on the right are not supported")
}
+ // We support streaming left outer joins with static on the
right always, and with
+ // stream on both sides under the appropriate conditions.
+ case LeftOuter =>
+ if (!left.isStreaming && right.isStreaming) {
+ throwError("Left outer join with a streaming
DataFrame/Dataset " +
+ "on the right and a static DataFrame/Dataset on the left
is not supported")
+ } else if (left.isStreaming && right.isStreaming) {
+ val watermarkInJoinKeys =
StreamingJoinHelper.isWatermarkInJoinKeys(subPlan)
+
+ val hasValidWatermarkRange =
+ StreamingJoinHelper.getStateValueWatermark(
+ left.outputSet, right.outputSet, condition,
Some(1000000)).isDefined
+
--- End diff --
extra line.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]