anishshri-db commented on code in PR #53583:
URL: https://github.com/apache/spark/pull/53583#discussion_r2644235871


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -2401,6 +2401,80 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("multiGet - batch retrieval of multiple keys") {
+    tryWithProviderResource(newStoreProvider(useColumnFamilies = false)) { 
provider =>
+      val store = provider.getStore(0)
+      try {
+        // Put multiple key-value pairs
+        put(store, "a", 1, 10)
+        put(store, "b", 2, 20)
+        put(store, "c", 3, 30)
+        put(store, "d", 4, 40)
+
+        // Create keys array for multiGet
+        val keys = Array(
+          dataToKeyRow("a", 1),
+          dataToKeyRow("b", 2),
+          dataToKeyRow("c", 3),
+          dataToKeyRow("nonexistent", 999) // Key that doesn't exist
+        )
+
+        // Perform multiGet
+        // Note: multiGet returns an iterator that reuses the underlying 
UnsafeRow,
+        // so we must copy each row when collecting to an array
+        val results = store.multiGet(keys, StateStore.DEFAULT_COL_FAMILY_NAME)
+          .map(row => if (row != null) row.copy() else null).toArray
+
+        // Verify results
+        assert(results.length === 4)
+        assert(valueRowToData(results(0)) === 10)
+        assert(valueRowToData(results(1)) === 20)
+        assert(valueRowToData(results(2)) === 30)
+        assert(results(3) === null) // Non-existent key should return null
+      } finally {
+        if (!store.hasCommitted) store.abort()
+      }
+    }
+  }
+
+  test("deleteRange - bulk deletion of keys in range") {
+    tryWithProviderResource(
+      newStoreProvider(
+        keySchemaWithRangeScan,
+        RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),

Review Comment:
   Should we enforce in the code that deleteRange will only work for range scan 
encoder ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1022,6 +1022,35 @@ class RocksDB(
     }
   }
 
+  /**
+   * Get the values for multiple keys in a single batch operation.
+   * Uses RocksDB's native multiGet for efficient batch lookups.
+   *
+   * @param keys   Array of keys to look up
+   * @param cfName The column family name
+   * @return Array of values corresponding to the keys (null for keys that 
don't exist)
+   */
+  def multiGet(
+      keys: Array[Array[Byte]],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Array[Byte]] 
= {
+    updateMemoryUsageIfNeeded()
+    // Prepare keys
+    val finalKeys =
+      if (useColumnFamilies) {
+        keys.map(encodeStateRowWithPrefix(_, cfName))
+      } else {
+        keys
+      }
+
+    val keysList = java.util.Arrays.asList(finalKeys: _*)
+
+    // Call RocksDB multiGet
+    val valuesList = db.multiGetAsList(readOptions, keysList)
+
+    // Convert to Array[Array[Byte]] in one line
+    valuesList.toArray(new Array[Array[Byte]](valuesList.size()))

Review Comment:
   This will invoke another copy ? can we leave it as `java.util.List` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -115,6 +115,10 @@ trait ReadStateStore {
       key: UnsafeRow,
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow
 
+  def multiGet(keys: Array[UnsafeRow], colFamilyName: String): 
Iterator[UnsafeRow] = {
+    keys.iterator.map(key => get(key, colFamilyName))

Review Comment:
   Why not just do the override in HDFSProvider for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to