c21 commented on a change in pull request #30395:
URL: https://github.com/apache/spark/pull/30395#discussion_r530795959
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -298,8 +298,14 @@ object UnsupportedOperationChecker extends Logging {
// no further validations needed
case FullOuter =>
- if (left.isStreaming || right.isStreaming) {
- throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
+ if (left.isStreaming && !right.isStreaming) {
+ throwError(s"$FullOuter joins with streaming
DataFrames/Datasets on the left " +
Review comment:
@xuanyuanking - both way works for me, changed to `FullOuter` here.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -315,40 +321,17 @@ object UnsupportedOperationChecker extends Logging {
throwError(s"$joinType join with a streaming DataFrame/Dataset
" +
"on the right and a static DataFrame/Dataset on the left is
not supported")
} else if (left.isStreaming && right.isStreaming) {
- val watermarkInJoinKeys =
StreamingJoinHelper.isWatermarkInJoinKeys(subPlan)
-
- val hasValidWatermarkRange =
- StreamingJoinHelper.getStateValueWatermark(
- left.outputSet, right.outputSet, condition,
Some(1000000)).isDefined
-
- if (!watermarkInJoinKeys && !hasValidWatermarkRange) {
- throwError(
- s"Stream-stream $joinType join between two streaming
DataFrame/Datasets " +
- "is not supported without a watermark in the join keys, or
a watermark on " +
- "the nullable side and an appropriate range condition")
- }
+ checkForStreamStreamJoinWatermark(j)
}
// We support streaming right outer joins with static on the left
always, and with
// stream on both sides under the appropriate conditions.
case RightOuter =>
if (left.isStreaming && !right.isStreaming) {
- throwError("Right outer join with a streaming
DataFrame/Dataset on the left and " +
+ throwError(s"$RightOuter join with a streaming
DataFrame/Dataset on the left and " +
Review comment:
@xuanyuanking - sure, updated.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -438,4 +421,32 @@ object UnsupportedOperationChecker extends Logging {
throw new AnalysisException(
msg, operator.origin.line, operator.origin.startPosition, Some(operator))
}
+
+ private def checkForStreamStreamJoinWatermark(join: Join): Unit = {
+ val watermarkInJoinKeys = StreamingJoinHelper.isWatermarkInJoinKeys(join)
+
+ val hasValidWatermarkRange = join.joinType match {
Review comment:
@xuanyuanking - added back. sorry I was missing that in the first place.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -333,13 +339,23 @@ case class StreamingSymmetricHashJoinExec(
stateFormatVersion match {
case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key,
kv.value))
case 2 => kv.matched
- case _ =>
- throw new IllegalStateException("Unexpected state format
version! " +
- s"version $stateFormatVersion")
+ case _ => throwBadStateFormatVersionException()
}
}.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
hashJoinOutputIter ++ outerOutputIter
+ case FullOuter =>
+ lazy val isKeyToValuePairMatched = (kv: KeyToValuePair) =>
+ stateFormatVersion match {
+ case 2 => kv.matched
+ case _ => throwBadStateFormatVersionException()
+ }
+ val leftSideOutputIter =
leftSideJoiner.removeOldState().filterNot(isKeyToValuePairMatched)
Review comment:
@xuanyuanking - yeah I agree this is too nit, updated anyway.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -382,6 +398,7 @@ case class StreamingSymmetricHashJoinExec(
leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
case LeftOuter => rightSideJoiner.removeOldState()
case RightOuter => leftSideJoiner.removeOldState()
+ case FullOuter => Iterator.empty
Review comment:
@xuanyuanking - agree, updated.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]