zeruibao commented on code in PR #54298:
URL: https://github.com/apache/spark/pull/54298#discussion_r2829523336


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -2547,6 +2547,82 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("deleteRange - changelog checkpointing records and replays range 
deletions") {
+    // useColumnFamilies = true is required to get changelog writer V2 which 
supports
+    // DELETE_RANGE_RECORD. V1 (used when useColumnFamilies = false) does not 
support it.
+    withSQLConf(
+      RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled" -> "true",
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "100") {
+      val storeId = StateStoreId(newDir(), Random.nextInt(), 0)
+      val keyEncoderSpec = 
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0))
+      val cfName = "testColFamily"
+
+      // Create provider and commit version 1 with some data and a deleteRange
+      tryWithProviderResource(
+        newStoreProvider(storeId, keyEncoderSpec,
+          keySchema = keySchemaWithRangeScan,
+          useColumnFamilies = true)) { provider =>
+        val store = provider.getStore(0)
+        store.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+
+        // Put keys: (1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")
+        store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40), 
cfName)
+        store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50), 
cfName)
+        store.commit()
+
+        // Version 2: deleteRange [2, 4) - should delete keys 2 and 3
+        val store2 = provider.getStore(1)
+        store2.createColFamilyIfAbsent(cfName,
+          keySchemaWithRangeScan, valueSchema,
+          RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+        val beginKey = dataToKeyRowWithRangeScan(2L, "")
+        val endKey = dataToKeyRowWithRangeScan(4L, "")
+        store2.deleteRange(beginKey, endKey, cfName)
+        store2.commit()
+      }
+
+      // Reload from a fresh provider (same storeId) to force changelog replay
+      tryWithProviderResource(
+        newStoreProvider(storeId, keyEncoderSpec,
+          keySchema = keySchemaWithRangeScan,
+          useColumnFamilies = true)) { reloadedProvider =>
+        val reloadedStore = reloadedProvider.getStore(2)
+        try {
+          reloadedStore.createColFamilyIfAbsent(cfName,
+            keySchemaWithRangeScan, valueSchema,
+            RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
+          val remainingKeys = reloadedStore.iterator(cfName).map { kv =>
+            keyRowWithRangeScanToData(kv.key)
+          }.toSeq
+
+          // Keys 1, 4, 5 should remain; keys 2, 3 should have been deleted 
via replay
+          assert(remainingKeys.length === 3)
+          assert(remainingKeys.map(_._1).toSet === Set(1L, 4L, 5L))
+        } finally {
+          if (!reloadedStore.hasCommitted) reloadedStore.abort()
+        }
+
+        // Verify that the change data reader returns DELETE_RANGE_RECORD with 
beginKey,
+        // null value, and endKey in the dedicated end_key field.
+        val reader = reloadedProvider.asInstanceOf[SupportsFineGrainedReplay]

Review Comment:
   hmmm, this test is mainly to test the behavior of changelog checkpointing 
records and replays. Would prefer to put it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to