HeartSaVioR commented on code in PR #47107:
URL: https://github.com/apache/spark/pull/47107#discussion_r1665117986


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -493,57 +365,41 @@ class RocksDB(
    *
    * @note This update is not committed to disk until commit() is called.
    */
-  def merge(
-      key: Array[Byte],
-      value: Array[Byte],
-      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    if (!useColumnFamilies) {
-      throw StateStoreErrors.unsupportedOperationException("merge",
-        multColFamiliesDisabledStr)
-    }
-    verifyColFamilyOperations("merge", colFamilyName)
+  def merge(key: Array[Byte], value: Array[Byte]): Unit = {
 
     if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+      val oldValue = db.get(readOptions, key)
       if (oldValue == null) {
         numKeysOnWritingVersion += 1
       }
     }
-    db.merge(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+    db.merge(writeOptions, key, value)
 
-    changelogWriter.foreach(_.merge(key, value, colFamilyName))
+    changelogWriter.foreach(_.merge(key, value))
   }
 
   /**
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(
-      key: Array[Byte],
-      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
-    verifyColFamilyOperations("remove", colFamilyName)
+  def remove(key: Array[Byte]): Unit = {
     if (conf.trackTotalNumberOfRows) {
-      val value = db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, 
key)
+      val value = db.get(readOptions, key)
       if (value != null) {
         numKeysOnWritingVersion -= 1
       }
     }
-    db.delete(colFamilyNameToHandleMap(colFamilyName), writeOptions, key)
-    if (useColumnFamilies) {
-      changelogWriter.foreach(_.delete(key, colFamilyName))
-    } else {
-      changelogWriter.foreach(_.delete(key))
-    }
+    db.delete(writeOptions, key)
+    changelogWriter.foreach(_.delete(key))
   }
 
   /**
    * Get an iterator of all committed and uncommitted key-value pairs.
    */
-  def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME):
+  def iterator():

Review Comment:
   nit: could be a single liner



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -591,10 +446,9 @@ class RocksDB(
     }
   }
 
-  def prefixScan(prefix: Array[Byte], colFamilyName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME):
+  def prefixScan(prefix: Array[Byte]):

Review Comment:
   nit: could be a single liner



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -91,70 +96,106 @@ private[sql] class RocksDBStateStoreProvider
      */
     override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("valuesIterator", 
colFamilyName)
 
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val valueEncoder = kvEncoder._2
       val keyEncoder = kvEncoder._1
 
       verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator 
requires a encoder " +
       "that supports multiple values for a single key.")
-      val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
-      valueEncoder.decodeValues(encodedKey)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      val encodedValues = rocksDB.get(encodedKey)
+      valueEncoder.decodeValues(encodedValues)
     }
 
     override def merge(key: UnsafeRow, value: UnsafeRow,
         colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
       verify(state == UPDATING, "Cannot merge after already committed or 
aborted")
+      ColumnFamilyUtils.verifyColFamilyOperations("merge", colFamilyName)

Review Comment:
   nit: empty line after this line, to be consistent with the above methods



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -2375,8 +2006,8 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 
   def toStr(kv: ByteArrayPair): (String, String) = (toStr(kv.key), 
toStr(kv.value))
 
-  def iterator(db: RocksDB, colFamilyName: String = "default"):
-    Iterator[(String, String)] = db.iterator(colFamilyName).map(toStr)
+  def iterator(db: RocksDB):

Review Comment:
   nit: could fit to one liner?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -91,70 +96,106 @@ private[sql] class RocksDBStateStoreProvider
      */
     override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("valuesIterator", 
colFamilyName)
 
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val valueEncoder = kvEncoder._2
       val keyEncoder = kvEncoder._1
 
       verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator 
requires a encoder " +
       "that supports multiple values for a single key.")
-      val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
-      valueEncoder.decodeValues(encodedKey)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      val encodedValues = rocksDB.get(encodedKey)
+      valueEncoder.decodeValues(encodedValues)
     }
 
     override def merge(key: UnsafeRow, value: UnsafeRow,
         colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
       verify(state == UPDATING, "Cannot merge after already committed or 
aborted")
+      ColumnFamilyUtils.verifyColFamilyOperations("merge", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val keyEncoder = kvEncoder._1
       val valueEncoder = kvEncoder._2
       verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation 
requires an encoder" +
         " which supports multiple values for a single key")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot merge a null value")
-      rocksDB.merge(keyEncoder.encodeKey(key), 
valueEncoder.encodeValue(value), colFamilyName)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.merge(encodedKey, valueEncoder.encodeValue(value))
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): 
Unit = {
       verify(state == UPDATING, "Cannot put after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot put a null value")
+      ColumnFamilyUtils.verifyColFamilyOperations("put", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.put(kvEncoder._1.encodeKey(key),
-        kvEncoder._2.encodeValue(value), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.put(encodedKey, kvEncoder._2.encodeValue(value))
     }
 
     override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("remove", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.remove(kvEncoder._1.encodeKey(key), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.remove(encodedKey)
     }
 
     override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+      // Note this verify function only verify on the colFamilyName being 
valid,
+      // we are actually doing prefix when useColumnFamilies,
+      // but pass "iterator" to throw correct error message
+      ColumnFamilyUtils.verifyColFamilyOperations("iterator", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val rowPair = new UnsafeRowPair()
-      rocksDB.iterator(colFamilyName).map { kv =>
-        rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
-          kvEncoder._2.decodeValue(kv.value))
-        if (!isValidated && rowPair.value != null && !useColumnFamilies) {
-          StateStoreProvider.validateStateRowFormat(
-            rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
-          isValidated = true
+
+      // As Virtual Column Family attaches a column family prefix to the key 
row,
+      // we'll need to do prefixScan on the default column family with the 
same column
+      // family id prefix to get all rows stored in a given virtual column 
family
+      if (useColumnFamilies) {
+        val cfId: Short = colFamilyNameToIdMap.get(colFamilyName)

Review Comment:
   With above proposal, we could let encode provide CF ID and the bytes of CF 
ID.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -961,45 +804,17 @@ class RocksDB(
 
   private def getDBProperty(property: String): Long = {
     // get cumulative sum across all available column families

Review Comment:
   nit: obsolete code comment?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -365,6 +435,10 @@ private[sql] class RocksDBStateStoreProvider
   private val keyValueEncoderMap = new 
java.util.concurrent.ConcurrentHashMap[String,
     (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]
 
+  private val colFamilyNameToIdMap = new 
java.util.concurrent.ConcurrentHashMap[String, Short]
+  // TODO SPARK-48796 load column family id from state schema when restarting

Review Comment:
   This sounds like we can't release before addressing SPARK-48796, do I 
understand correctly? If then I'd need to mark the ticket as blocker for Spark 
4.0.0.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -392,12 +463,126 @@ private[sql] class RocksDBStateStoreProvider
       case e: Throwable => throw QueryExecutionErrors.cannotLoadStore(e)
     }
   }
+
+  private object ColumnFamilyUtils {
+    private val multColFamiliesDisabledStr = "multiple column families 
disabled in " +
+      "RocksDBStateStoreProvider"
+
+    def getVcfIdBytes(id: Short): Array[Byte] = {
+      val encodedBytes = new Array[Byte](VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+      Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, id)

Review Comment:
   We can just consider that as API spec of Platform. get/put requires starting 
offset and from reading byte array they require Platform.BYTE_ARRAY_OFFSET to 
represent the starting offset.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -365,6 +435,10 @@ private[sql] class RocksDBStateStoreProvider
   private val keyValueEncoderMap = new 
java.util.concurrent.ConcurrentHashMap[String,
     (RocksDBKeyStateEncoder, RocksDBValueStateEncoder)]
 
+  private val colFamilyNameToIdMap = new 
java.util.concurrent.ConcurrentHashMap[String, Short]
+  // TODO SPARK-48796 load column family id from state schema when restarting

Review Comment:
   Also do we have a path forward on storing column family id? Will SPARK-48796 
address this altogether?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -24,15 +24,16 @@ import java.nio.{ByteBuffer, ByteOrder}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
-import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION}
+import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION, VIRTUAL_COL_FAMILY_PREFIX_BYTES}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 
 sealed trait RocksDBKeyStateEncoder {
   def supportPrefixKeyScan: Boolean
-  def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
-  def encodeKey(row: UnsafeRow): Array[Byte]
+  def encodePrefixKey(prefixKey: UnsafeRow, vcfId: Option[Short]): Array[Byte]
+  def encodeKey(row: UnsafeRow, vcfId: Option[Short]): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+  def offSetForColFamilyPrefix: Int

Review Comment:
   That said, I'll review the change in Encoder file once the proposal is 
reflected or we decide not to do so.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -24,15 +24,16 @@ import java.nio.{ByteBuffer, ByteOrder}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, JoinedRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
-import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION}
+import 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.{STATE_ENCODING_NUM_VERSION_BYTES,
 STATE_ENCODING_VERSION, VIRTUAL_COL_FAMILY_PREFIX_BYTES}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 
 sealed trait RocksDBKeyStateEncoder {
   def supportPrefixKeyScan: Boolean
-  def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte]
-  def encodeKey(row: UnsafeRow): Array[Byte]
+  def encodePrefixKey(prefixKey: UnsafeRow, vcfId: Option[Short]): Array[Byte]
+  def encodeKey(row: UnsafeRow, vcfId: Option[Short]): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+  def offSetForColFamilyPrefix: Int

Review Comment:
   nit: off`s`et, not off`Set`
   
   Btw, I'd propose two things;
   
   1. encode(Prefix)Key and decodeKey are no longer symmetric. 
encode(Prefix)Key is dealing with vcfId but in decodeKey there is no way for us 
to get the vcfId back. 
   
   I see it could be less performant (though I feel like it doesn't matter 
much) if we have to always read the part of vcfId and return even for the case 
we know the vcfId already. But probably better to have a new method which 
explicitly calls out in the name it will skip vcfId and assume the caller 
already knows vcfId.
   
   2. It looks like column family prefix is applied over the all encoder 
implementations, which I think we can do better abstraction. This may be a good 
time to have base (abstract) implementation of RocksDBKeyStateEncoder handling 
column family prefix.
   
   Something like following:
   
   ```
   abstract class RocksDBKeyStateEncoderBase(useColumnFamilies: Boolean)
     extends RocksDBKeyStateEncoder {
   
     protected def encodeColumnFamilyPrefix(
         numBytes: Int,
         vcfId: Option[Short],
         useColumnFamilies: Boolean): (Array[Byte], Int) = {
       if (useColumnFamilies) {
         val encodedBytes = new Array[Byte](numBytes + 
VIRTUAL_COL_FAMILY_PREFIX_BYTES)
         Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, vcfId.get)
         (encodedBytes, Platform.BYTE_ARRAY_OFFSET + 
VIRTUAL_COL_FAMILY_PREFIX_BYTES)
       } else {
         val encodedBytes = new Array[Byte](numBytes)
         (encodedBytes, Platform.BYTE_ARRAY_OFFSET)
       }
     }
   
     protected def decodeColumnFamilyPrefix(keyBytes: Array[Byte]): 
(Option[Short], Int) = {
       if (useColumnFamilies) {
         val vcfId = Platform.getShort(keyBytes, Platform.BYTE_ARRAY_OFFSET)
         (Some(vcfId), Platform.BYTE_ARRAY_OFFSET + 
VIRTUAL_COL_FAMILY_PREFIX_BYTES)
       } else {
         (None, Platform.BYTE_ARRAY_OFFSET)
       }
     }
   
     // Only if we want to skip over reading CF prefix...
     protected def decodeKeyStartOffset: Int = {
       if (useColumnFamilies) {
         Platform.BYTE_ARRAY_OFFSET + VIRTUAL_COL_FAMILY_PREFIX_BYTES
       } else Platform.BYTE_ARRAY_OFFSET
   }
   ```
   
   (The method name can change with preferred way, not talented with naming.)
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -330,7 +330,8 @@ trait StateStoreProvider {
    *                         A value not greater than 0 means the operator 
doesn't activate prefix
    *                         key, and the operator should not call prefixScan 
method in StateStore.
    * @param useColumnFamilies Whether the underlying state store uses a single 
or multiple column
-   *                          families
+   *                          families; by default we'll use virtual column 
family if this parameter

Review Comment:
   I feel like we shouldn't mention this - it's an implementation detail and 
even specific to RocksDB implementation. We define "interface" here and it's up 
to provider implementation how to deal with column families.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -261,7 +313,21 @@ private[sql] class RocksDBStateStoreProvider
     /** Remove column family if exists */
     override def removeColFamilyIfExists(colFamilyName: String): Boolean = {
       verify(useColumnFamilies, "Column families are not supported in this 
store")
-      val result = rocksDB.removeColFamilyIfExists(colFamilyName)
+      val result = {
+        val colFamilyExists = 
ColumnFamilyUtils.checkColFamilyExists(colFamilyName)
+        if (colFamilyExists) {
+          ColumnFamilyUtils.verifyColFamilyOperations("prefixScan", 
colFamilyName)
+          val idPrefix = ColumnFamilyUtils.getVcfIdBytes(
+            colFamilyNameToIdMap.get(colFamilyName)
+          )
+          rocksDB.prefixScan(idPrefix).foreach { kv =>
+            ColumnFamilyUtils.verifyColFamilyOperations("remove", 
colFamilyName)

Review Comment:
   Is this needed per each state row in the CF? It doesn't seem to be coupled 
with actual state row at all.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -57,20 +60,22 @@ private[sql] class RocksDBStateStoreProvider
         keyStateEncoderSpec: KeyStateEncoderSpec,
         useMultipleValuesPerKey: Boolean = false,
         isInternal: Boolean = false): Unit = {
-      verify(colFamilyName != StateStore.DEFAULT_COL_FAMILY_NAME,
-        s"Failed to create column family with reserved_name=$colFamilyName")
-      verify(useColumnFamilies, "Column families are not supported in this 
store")
-      rocksDB.createColFamilyIfAbsent(colFamilyName, isInternal)
+      ColumnFamilyUtils.createColFamilyIfAbsent(colFamilyName, isInternal)
+
       keyValueEncoderMap.putIfAbsent(colFamilyName,
-        (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec),
+        (RocksDBStateEncoder.getKeyEncoder(keyStateEncoderSpec, 
useColumnFamilies),

Review Comment:
   Wait, we have encoder per column family. Do we have a case where we assign 
different IDs to the same CF during lifecycle of the encoder instance? 
   
   If we don't, we can require encoder to maintain the CF ID it is in charge 
of, and caller no longer needs to look up twice for finding encoder vs finding 
CF ID. This can be done along with the above proposal - abstract class can 
handle this. If this is done, then we can remove param for vcfId and base trait 
to be unchanged.
   
   Even if we do, if we can re-initialize the encoder for new CF ID, then the 
above can be still applied.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -91,70 +96,106 @@ private[sql] class RocksDBStateStoreProvider
      */
     override def valuesIterator(key: UnsafeRow, colFamilyName: String): 
Iterator[UnsafeRow] = {
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("valuesIterator", 
colFamilyName)
 
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val valueEncoder = kvEncoder._2
       val keyEncoder = kvEncoder._1
 
       verify(valueEncoder.supportsMultipleValuesPerKey, "valuesIterator 
requires a encoder " +
       "that supports multiple values for a single key.")
-      val encodedKey = rocksDB.get(keyEncoder.encodeKey(key), colFamilyName)
-      valueEncoder.decodeValues(encodedKey)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      val encodedValues = rocksDB.get(encodedKey)
+      valueEncoder.decodeValues(encodedValues)
     }
 
     override def merge(key: UnsafeRow, value: UnsafeRow,
         colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
       verify(state == UPDATING, "Cannot merge after already committed or 
aborted")
+      ColumnFamilyUtils.verifyColFamilyOperations("merge", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val keyEncoder = kvEncoder._1
       val valueEncoder = kvEncoder._2
       verify(valueEncoder.supportsMultipleValuesPerKey, "Merge operation 
requires an encoder" +
         " which supports multiple values for a single key")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot merge a null value")
-      rocksDB.merge(keyEncoder.encodeKey(key), 
valueEncoder.encodeValue(value), colFamilyName)
+
+      val encodedKey = keyEncoder.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.merge(encodedKey, valueEncoder.encodeValue(value))
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow, colFamilyName: String): 
Unit = {
       verify(state == UPDATING, "Cannot put after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
       require(value != null, "Cannot put a null value")
+      ColumnFamilyUtils.verifyColFamilyOperations("put", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.put(kvEncoder._1.encodeKey(key),
-        kvEncoder._2.encodeValue(value), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.put(encodedKey, kvEncoder._2.encodeValue(value))
     }
 
     override def remove(key: UnsafeRow, colFamilyName: String): Unit = {
       verify(state == UPDATING, "Cannot remove after already committed or 
aborted")
       verify(key != null, "Key cannot be null")
+      ColumnFamilyUtils.verifyColFamilyOperations("remove", colFamilyName)
+
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
-      rocksDB.remove(kvEncoder._1.encodeKey(key), colFamilyName)
+      val encodedKey = kvEncoder._1.encodeKey(key, 
Option(colFamilyNameToIdMap.get(colFamilyName)))
+      rocksDB.remove(encodedKey)
     }
 
     override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {
+      // Note this verify function only verify on the colFamilyName being 
valid,
+      // we are actually doing prefix when useColumnFamilies,
+      // but pass "iterator" to throw correct error message
+      ColumnFamilyUtils.verifyColFamilyOperations("iterator", colFamilyName)
       val kvEncoder = keyValueEncoderMap.get(colFamilyName)
       val rowPair = new UnsafeRowPair()
-      rocksDB.iterator(colFamilyName).map { kv =>
-        rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
-          kvEncoder._2.decodeValue(kv.value))
-        if (!isValidated && rowPair.value != null && !useColumnFamilies) {
-          StateStoreProvider.validateStateRowFormat(
-            rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
-          isValidated = true
+
+      // As Virtual Column Family attaches a column family prefix to the key 
row,
+      // we'll need to do prefixScan on the default column family with the 
same column
+      // family id prefix to get all rows stored in a given virtual column 
family
+      if (useColumnFamilies) {
+        val cfId: Short = colFamilyNameToIdMap.get(colFamilyName)
+        rocksDB.prefixScan(ColumnFamilyUtils.getVcfIdBytes(cfId)).map { kv =>
+          rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
+            kvEncoder._2.decodeValue(kv.value))
+          if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+            StateStoreProvider.validateStateRowFormat(
+              rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
+            isValidated = true
+          }
+          rowPair
+        }
+      } else {
+        rocksDB.iterator().map { kv =>
+          rowPair.withRows(kvEncoder._1.decodeKey(kv.key),
+            kvEncoder._2.decodeValue(kv.value))
+          if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+            StateStoreProvider.validateStateRowFormat(
+              rowPair.key, keySchema, rowPair.value, valueSchema, storeConf)
+            isValidated = true
+          }
+          rowPair
         }
-        rowPair
       }
     }
 
     override def prefixScan(prefixKey: UnsafeRow, colFamilyName: String):
       Iterator[UnsafeRowPair] = {
+      ColumnFamilyUtils.verifyColFamilyOperations("prefixScan", colFamilyName)

Review Comment:
   nit: empty line after this line, to be consistent with the above methods



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to