nyaapa commented on code in PR #54769:
URL: https://github.com/apache/spark/pull/54769#discussion_r2926573892


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -856,6 +959,108 @@ abstract class SymmetricHashJoinStateManagerBase(
     keyToNumValues.put(key, numExistingValues + 1)
   }
 
+  /**
+   * Find the first non-null value index starting from the end and going up to 
stopIndex.
+   * Used by swap-with-last compaction in both 
[[getJoinedRowsAndRemoveMatched]] and
+   * [[evictAndReturnByValueCondition]].
+   */
+  protected def getRightMostNonNullIndex(
+      key: UnsafeRow, stopIndex: Long, numValues: Long): Option[Long] = {
+    (numValues - 1 to stopIndex by -1).find { idx =>
+      keyWithIndexToValue.get(key, idx) != null
+    }
+  }
+
+  /**
+   * Remove the value at the given index using swap-with-last compaction: the 
last element is
+   * moved into the hole, trailing nulls are cleaned up, and the logical array 
is shortened.
+   *
+   * @return the updated numValues after removal
+   */
+  protected def removeValueAtIndex(key: UnsafeRow, index: Long, numValues: 
Long): Long = {
+    var currentNumValues = numValues
+    if (index != currentNumValues - 1) {
+      val valuePairAtMaxIndex = keyWithIndexToValue.get(key, currentNumValues 
- 1)
+      if (valuePairAtMaxIndex != null) {
+        keyWithIndexToValue.put(key, index, valuePairAtMaxIndex.value,
+          valuePairAtMaxIndex.matched)
+      } else {
+        val nonNullIndex = getRightMostNonNullIndex(key, index + 1, 
currentNumValues)
+          .getOrElse(index)
+        if (nonNullIndex != index) {
+          val valuePair = keyWithIndexToValue.get(key, nonNullIndex)
+          keyWithIndexToValue.put(key, index, valuePair.value, 
valuePair.matched)
+        }
+
+        if (nonNullIndex != currentNumValues - 1) {

Review Comment:
   nit: aren't we in this branch exactly because 
   ```
         val valuePairAtMaxIndex = keyWithIndexToValue.get(key, 
currentNumValues - 1)
         if (valuePairAtMaxIndex != null) {
   ```
   is false, i.e. this condition will always be true?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala:
##########
@@ -856,6 +959,108 @@ abstract class SymmetricHashJoinStateManagerBase(
     keyToNumValues.put(key, numExistingValues + 1)
   }
 
+  /**
+   * Find the first non-null value index starting from the end and going up to 
stopIndex.
+   * Used by swap-with-last compaction in both 
[[getJoinedRowsAndRemoveMatched]] and
+   * [[evictAndReturnByValueCondition]].
+   */
+  protected def getRightMostNonNullIndex(
+      key: UnsafeRow, stopIndex: Long, numValues: Long): Option[Long] = {
+    (numValues - 1 to stopIndex by -1).find { idx =>
+      keyWithIndexToValue.get(key, idx) != null
+    }
+  }
+
+  /**
+   * Remove the value at the given index using swap-with-last compaction: the 
last element is
+   * moved into the hole, trailing nulls are cleaned up, and the logical array 
is shortened.
+   *
+   * @return the updated numValues after removal
+   */
+  protected def removeValueAtIndex(key: UnsafeRow, index: Long, numValues: 
Long): Long = {
+    var currentNumValues = numValues
+    if (index != currentNumValues - 1) {
+      val valuePairAtMaxIndex = keyWithIndexToValue.get(key, currentNumValues 
- 1)
+      if (valuePairAtMaxIndex != null) {
+        keyWithIndexToValue.put(key, index, valuePairAtMaxIndex.value,
+          valuePairAtMaxIndex.matched)
+      } else {
+        val nonNullIndex = getRightMostNonNullIndex(key, index + 1, 
currentNumValues)
+          .getOrElse(index)
+        if (nonNullIndex != index) {
+          val valuePair = keyWithIndexToValue.get(key, nonNullIndex)
+          keyWithIndexToValue.put(key, index, valuePair.value, 
valuePair.matched)
+        }
+
+        if (nonNullIndex != currentNumValues - 1) {

Review Comment:
   nit: aren't we in this branch exactly because 
   ```
         val valuePairAtMaxIndex = keyWithIndexToValue.get(key, 
currentNumValues - 1)
         if (valuePairAtMaxIndex != null) {
   ```
   is `false`, i.e. this condition will always be true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to