anishshri-db commented on code in PR #54083:
URL: https://github.com/apache/spark/pull/54083#discussion_r2766741799


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -127,7 +127,18 @@ class RocksDB(
   rocksDbOptions.setMaxOpenFiles(conf.maxOpenFiles)
   rocksDbOptions.setAllowFAllocate(conf.allowFAllocate)
   rocksDbOptions.setAvoidFlushDuringShutdown(true)
-  rocksDbOptions.setMergeOperator(new StringAppendOperator())
+  // Set merge operator based on version for backward compatibility
+  // Version 1: comma delimiter ",", Version 2: empty string ""

Review Comment:
   Should we note that this not documented but supported in RocksDB ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -2498,6 +2512,17 @@ object RocksDBConf {
   private val VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF =
     SQLConfEntry(VERIFY_NON_EMPTY_FILES_IN_ZIP_CONF_KEY, "true")
 
+  // Configuration to set the merge operator version for backward 
compatibility.
+  // Version 1 (default): Uses comma "," as delimiter for StringAppendOperator
+  // Version 2: Uses empty string "" as delimiter (no delimiter, direct 
concatenation)
+  //
+  // Note: this is also defined in 
`SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION`.
+  // These two places should be updated together.

Review Comment:
   Why do we need this in 2 places ?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3045,6 +3045,21 @@ object SQLConf {
       // 5 is the default table format version for RocksDB 6.20.3.
       .createWithDefault(5)
 
+  /**
+   * Note: this is defined in `RocksDBConf.MERGE_OPERATOR_VERSION_CONF`. These 
two places should
+   * be updated together.
+   */
+  val STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION =
+    buildConf("spark.sql.streaming.stateStore.rocksdb.mergeOperatorVersion")
+      .internal()
+      .doc("Set the RocksDB merge operator version. This will be stored in the 
checkpoint when " +
+        "starting a streaming query. The checkpoint will use this merge 
operator version in the " +

Review Comment:
   nit: `for the entire`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -127,7 +127,18 @@ class RocksDB(
   rocksDbOptions.setMaxOpenFiles(conf.maxOpenFiles)
   rocksDbOptions.setAllowFAllocate(conf.allowFAllocate)
   rocksDbOptions.setAvoidFlushDuringShutdown(true)
-  rocksDbOptions.setMergeOperator(new StringAppendOperator())
+  // Set merge operator based on version for backward compatibility
+  // Version 1: comma delimiter ",", Version 2: empty string ""
+  val mergeDelimiter = conf.mergeOperatorVersion match {
+    case 1 => ","
+    case 2 => ""
+    case v => throw new IllegalArgumentException(
+      s"Invalid merge operator version: $v. Supported versions are 1 and 2")
+  }
+  rocksDbOptions.setMergeOperator(new StringAppendOperator(mergeDelimiter))

Review Comment:
   This is little different than using default constructor we were using 
earlier ? Will this be equivalent ?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1766,6 +1778,41 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
     }
   }
 
+  test("validate rocksdb values iterator correctness - blind merge with 
operator version 2") {
+    withSQLConf(
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1",
+      SQLConf.STATE_STORE_ROCKSDB_MERGE_OPERATOR_VERSION.key -> "2") {
+
+      tryWithProviderResource(newStoreProvider(useColumnFamilies = true,
+        useMultipleValuesPerKey = true)) { provider =>
+        val store = provider.getStore(0)
+        // We do blind merge than put against non-existing key.
+        // Note that this is only safe with merge operator version 2.
+        merge(store, "a", 0, 1)
+
+        val iterator0 = store.valuesIterator(dataToKeyRow("a", 0))
+
+        assert(iterator0.hasNext)
+        assert(valueRowToData(iterator0.next()) === 1)
+        assert(!iterator0.hasNext)
+
+        merge(store, "a", 0, 2)

Review Comment:
   Can we also add a test for some combination of `put` and `merge` followed by 
`get` as needed to verify the result for both merge delimiter versions ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRowChecksum.scala:
##########
@@ -276,14 +278,14 @@ object KeyValueChecksumEncoder {
       // skip the checksum (first 4 bytes)
       currentPosition += java.lang.Integer.BYTES
       val valueRowSize = Platform.getInt(valueBytes, currentPosition)
-      // move to the next value and skip the delimiter character used for 
rocksdb merge
-      currentPosition += java.lang.Integer.BYTES + valueRowSize + 1
+      // move to the next value and skip the delimiter bytes used for rocksdb 
merge
+      currentPosition += java.lang.Integer.BYTES + valueRowSize + delimiterSize
       resultSize += valueRowSize
       numValues += 1
     }
 
     // include the number of delimiters used for merge
-    resultSize += numValues - 1
+    resultSize += (numValues - 1) * delimiterSize

Review Comment:
   For empty delimiter, `resultSize` would be `0` then ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to