xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509273007
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
/**
* Get all the matched values for given join condition, with marking matched.
* This method is designed to mark joined rows properly without exposing
internal index of row.
+ *
+ * @param excludeRowsAlreadyMatched Do not join with rows already matched
previously.
+ * This is used for right side of left semi
join in
+ * [[StreamingSymmetricHashJoinExec]] only.
*/
def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
- predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+ predicate: JoinedRow => Boolean,
+ excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
val numValues = keyToNumValues.get(key)
- keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+ keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
Review comment:
Yes, after taking a further look, the `joinedRow` already dropped the
message of `matched`, so it's hard to do now. +1 for the change now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]