xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509130528
##########
File path:
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = LeftSemi),
streamStreamSupported = false,
batchStreamSupported = false,
- expectedMsg = "left semi/anti joins")
+ expectedMsg = "LeftSemi join")
+
+ // Left semi joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and update mode",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attribute)),
+ OutputMode.Update(),
+ Seq("is not supported in Update output mode"))
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and complete mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType
= LeftSemi,
+ condition = Some(attribute === attribute))),
+ OutputMode.Complete(),
+ Seq("is not supported in Complete output mode"))
+
+ // Left semi joins: stream-stream allowed with join on watermark attribute
+ // Note that the attribute need not be watermarked on both sides.
+ assertSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on attribute with
left watermark",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attributeWithWatermark === attribute)),
+ OutputMode.Append())
+ assertSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on attribute with
right watermark",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attributeWithWatermark)),
+ OutputMode.Append())
+ assertNotSupportedInStreamingPlan(
+ "left semi join with stream-stream relations and join on non-watermarked
attribute",
+ streamRelation.join(streamRelation, joinType = LeftSemi,
+ condition = Some(attribute === attribute)),
+ OutputMode.Append(),
+ Seq("watermark in the join keys"))
Review comment:
Personally, I prefer to change the match string to `without a watermark
in the join keys`. I saw this is the same as the original tests, maybe we can
change them together in this PR.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -291,17 +291,17 @@ object UnsupportedOperationChecker extends Logging {
throwError("Full outer joins with streaming
DataFrames/Datasets are not supported")
}
- case LeftSemi | LeftAnti =>
+ case LeftAnti =>
if (right.isStreaming) {
- throwError("Left semi/anti joins with a streaming
DataFrame/Dataset " +
+ throwError("Left anti joins with a streaming DataFrame/Dataset
" +
"on the right are not supported")
}
// We support streaming left outer joins with static on the right
always, and with
Review comment:
nit: Also change this comment?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
/**
* Get all the matched values for given join condition, with marking matched.
* This method is designed to mark joined rows properly without exposing
internal index of row.
+ *
+ * @param excludeRowsAlreadyMatched Do not join with rows already matched
previously.
+ * This is used for right side of left semi
join in
+ * [[StreamingSymmetricHashJoinExec]] only.
*/
def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
- predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+ predicate: JoinedRow => Boolean,
+ excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
val numValues = keyToNumValues.get(key)
- keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+ keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
Review comment:
It makes more sense to add this filter logic in the `predicate`
param(i.e `postJoinFilter` for OneSideHashJoiner) for rightSideJoiner only,
corresponding to the comment
https://github.com/apache/spark/pull/30076/files#diff-6cd66da710d8d54025c1edf658bbec5230e8b4e748f9f2f884a60b1ba1efed42R264
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]