HeartSaVioR commented on code in PR #54769:
URL: https://github.com/apache/spark/pull/54769#discussion_r2922904732


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -856,6 +959,108 @@ abstract class SymmetricHashJoinStateManagerBase(
     keyToNumValues.put(key, numExistingValues + 1)
   }
 
+  /**
+   * Find the first non-null value index starting from the end and going up to 
stopIndex.
+   * Used by swap-with-last compaction in both 
[[getJoinedRowsAndRemoveMatched]] and
+   * [[evictAndReturnByValueCondition]].
+   */
+  protected def getRightMostNonNullIndex(
+      key: UnsafeRow, stopIndex: Long, numValues: Long): Option[Long] = {
+    (numValues - 1 to stopIndex by -1).find { idx =>
+      keyWithIndexToValue.get(key, idx) != null
+    }
+  }
+
+  /**

Review Comment:
   For reviewer: This is a refactor extracted from 
evictAndReturnByValueCondition, since we now use this in both methods.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -856,6 +959,108 @@ abstract class SymmetricHashJoinStateManagerBase(
     keyToNumValues.put(key, numExistingValues + 1)
   }
 
+  /**

Review Comment:
   For reviewer: This is a refactor extracted from 
evictAndReturnByValueCondition, since we now use this in both methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to