anishshri-db commented on code in PR #47107:
URL: https://github.com/apache/spark/pull/47107#discussion_r1664893115


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -91,71 +96,104 @@ private[sql] class RocksDBStateStoreProvider
      */
     override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("valuesIterator", 
colFamilyName)
 
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val valueEncoder = kvEncoder._2
       val keyEncoder = kvEncoder._1
 
       verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator 
requires a encoder " +
       "that supports multiple values for a single key.")
-      val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
-      valueEncoder.decodeValues(encodedKey)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyToIdMap.get(colFamilyName)))
+      val encodedValues = rocksDB.get(encodedKey)
+      valueEncoder.decodeValues(encodedValues)
     }
 
     override def merge(key: UnsafeRow, value: UnsafeRow,
         colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
       verify(state == UPDATING, "Cannot merge after already committed or 
aborted")
+      ColumnFamilyUtils.verifyColFamilyOperations("merge", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val keyEncoder = kvEncoder._1
       val valueEncoder = kvEncoder._2
       verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation 
requires an encoder" +
         " which supports multiple values for a single key")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot merge a null value")
-      rocksDB.merge(keyEncoder.encodeKey(key), 
valueEncoder.encodeValue(value), colFamilyName)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyToIdMap.get(colFamilyName)))
+      rocksDB.merge(encodedKey, valueEncoder.encodeValue(value))
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): 
Unit = {
       verify(state == UPDATING, "Cannot put after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot put a null value")
+      ColumnFamilyUtils.verifyColFamilyOperations("put", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.put(kvEncoder._1.encodeKey(key),
-        kvEncoder._2.encodeValue(value), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyToIdMap.get(colFamilyName)))
+      rocksDB.put(encodedKey, kvEncoder._2.encodeValue(value))
     }
 
     override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("remove", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.remove(kvEncoder._1.encodeKey(key), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyToIdMap.get(colFamilyName)))
+      rocksDB.remove(encodedKey)
     }
 
     override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+      // Note this verify function only verify on the colFamilyName being 
valid,
+      // we are actually doing prefix when useColumnFamilies,
+      // but pass "iterator" to throw correct error message
+      ColumnFamilyUtils.verifyColFamilyOperations("iterator", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val rowPair = new UnsafeRowPair()
-      rocksDB.iterator(colFamilyName).map { kv =>
-        rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
-          kvEncoder._2.decodeValue(kv.value))
-        if (!isValidated && rowPair.value != null && !useColumnFamilies) {
-          StateStoreProvider.validateStateRowFormat(
-            rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
-          isValidated = true
+
+      if (useColumnFamilies) {

Review Comment:
   Can we add a small comment here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to