xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509273007



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing 
internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched 
previously.
+   *                                  This is used for right side of left semi 
join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       Yes, after taking a further look, the `joinedRow` already dropped the 
message of `matched`, so it's hard to do now. +1 for the change now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to