HeartSaVioR commented on code in PR #49304:
URL: https://github.com/apache/spark/pull/49304#discussion_r1957541951


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -678,7 +654,7 @@ object RocksDBStateStoreProvider {
   // Version as a single byte that specifies the encoding of the row data in 
RocksDB
   val STATE_ENCODING_NUM_VERSION_BYTES = 1
   val STATE_ENCODING_VERSION: Byte = 0
-  val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2

Review Comment:
   This should be here, rather than StateStore. I think it was put to the right 
place.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder, Short)],
     colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
     fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  private def getColFamilyIdBytes: Option[Array[Byte]] = {
-    if (colFamilyNameOpt.isDefined) {
-      val colFamilyName = colFamilyNameOpt.get
-      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
-        throw new IllegalStateException(
-          s"Column family $colFamilyName not found in the key value encoder 
map")
-      }
-      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
-    } else {
-      None
-    }
+  /**
+   * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+   *
+   * This method creates a fixed-size byte array prefixed with the virtual 
column family ID,
+   * which is used to partition data within RocksDB.
+   *
+   * @param virtualColFamilyId The column family identifier to encode
+   * @return A byte array containing the encoded column family ID
+   */
+  private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] = 
{
+    val encodedBytes = new 
Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+    Platform.putShort(encodedBytes, Platform.BYTE_ARRAY_OFFSET, 
virtualColFamilyId)
+    encodedBytes
   }
 
-  private val colFamilyIdBytesOpt: Option[Array[Byte]] = getColFamilyIdBytes
+  private def getExtractedKey(data: Array[Byte]): Array[Byte] = {

Review Comment:
   ditto



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1814,18 +1663,17 @@ class NoPrefixKeyStateEncoder(
         rowBytes.length
       )
 
-      encodeStateRowWithPrefix(dataWithVersion)
+      dataWithVersion
     }
   }
 
   override def decodeKey(keyBytes: Array[Byte]): UnsafeRow = {
     if (!useColumnFamilies) {
-      dataEncoder.decodeKey(decodeStateRowData(keyBytes))
+      dataEncoder.decodeKey(keyBytes)

Review Comment:
   ditto if the above is turned out to be a bug.
   
   If the above is a bug, this cannot read from existing checkpoint from prior 
Spark versions.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +813,75 @@ class RocksDB(
    *
    * @note This update is not committed to disk until commit() is called.
    */
-  def merge(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def merge(
+      key: Array[Byte],
+      value: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          val cfInfo = getColumnFamilyInfo(cfName)
+          if (cfInfo.isInternal) {
+            numInternalKeysOnWritingVersion += 1
+          } else {
+            numKeysOnWritingVersion += 1
+          }
+        }
+      }
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
       }
     }
-    db.merge(writeOptions, key, value)
 
-    changelogWriter.foreach(_.merge(key, value))
+    db.merge(writeOptions, keyWithPrefix, value)
+    changelogWriter.foreach(_.merge(keyWithPrefix, value))
   }
 
   /**
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(key: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val value = db.get(readOptions, key)
-      if (value != null) {
-        numKeysOnWritingVersion -= 1
+  def remove(key: Array[Byte], cfName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {

Review Comment:
   nit: the extracted method in above can handle both directions (when 
putting/removing) with a flag. I'd be OK to have two different extracted 
methods if it is clearer.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -918,18 +968,34 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       }
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
-      assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+
+      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+        assert(snapshotVersionsPresent(remoteDir) === Seq(31, 60, 60))
+      } else {
+        assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+      }
       if (enableStateStoreCheckpointIds) {
         // recommit version 60 creates another changelog file with different 
unique id
-        assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
+        if (colFamiliesEnabled) {
+          assert(changelogVersionsPresent(remoteDir) === (31 to 60) :+ 60)
+        } else {
+          assert(changelogVersionsPresent(remoteDir) === (30 to 60) :+ 60)
+        }
       } else {
         assert(changelogVersionsPresent(remoteDir) === (30 to 60))
       }
 
       // Verify the content of retained versions.
-      for (version <- 30 to 60) {
-        db.load(version, versionToUniqueId.get(version), readOnly = true)
-        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+        for (version <- 31 to 60) {

Review Comment:
   nit: please leave a comment why version 30 does not work, and what is the 
state value in version 30 in this case. I'm not sure why there is a difference, 
from reading the test code.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -847,35 +828,47 @@ class RocksDBStateStoreChangeDataReader(
     endVersion: Long,
     compressionCodec: CompressionCodec,
     keyValueEncoderMap:
-      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder)],
+      ConcurrentHashMap[String, (RocksDBKeyStateEncoder, 
RocksDBValueStateEncoder, Short)],
     colFamilyNameOpt: Option[String] = None)
   extends StateStoreChangeDataReader(
     fm, stateLocation, startVersion, endVersion, compressionCodec, 
colFamilyNameOpt) {
 
   override protected var changelogSuffix: String = "changelog"
 
-  private def getColFamilyIdBytes: Option[Array[Byte]] = {
-    if (colFamilyNameOpt.isDefined) {
-      val colFamilyName = colFamilyNameOpt.get
-      if (!keyValueEncoderMap.containsKey(colFamilyName)) {
-        throw new IllegalStateException(
-          s"Column family $colFamilyName not found in the key value encoder 
map")
-      }
-      Some(keyValueEncoderMap.get(colFamilyName)._1.getColumnFamilyIdBytes())
-    } else {
-      None
-    }
+  /**
+   * Encodes a virtual column family ID into a byte array suitable for RocksDB.
+   *
+   * This method creates a fixed-size byte array prefixed with the virtual 
column family ID,
+   * which is used to partition data within RocksDB.
+   *
+   * @param virtualColFamilyId The column family identifier to encode
+   * @return A byte array containing the encoded column family ID
+   */
+  private def getColumnFamilyIdBytes(virtualColFamilyId: Short): Array[Byte] = 
{

Review Comment:
   We are now having two different places to handle cfId, here and RocksDB. 
Shall we consolidate, or extract out to new class/object if there is no good 
way to consolidate?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -1834,7 +1834,6 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       store = provider.getRocksDBStateStore(2)
       store.createColFamilyIfAbsent(colFamily3, keySchema, valueSchema,
         NoPrefixKeyStateEncoderSpec(keySchema))
-      assert(store.getColumnFamilyId(colFamily3) == 3)

Review Comment:
   I'm not sure this test is still valid after you removed the check for column 
family ID. What's rationale about removing this?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -1791,16 +1644,12 @@ class RangeKeyScanStateEncoder(
 class NoPrefixKeyStateEncoder(
     dataEncoder: RocksDBDataEncoder,
     keySchema: StructType,
-    useColumnFamilies: Boolean = false,
-    columnFamilyInfo: Option[ColumnFamilyInfo] = None)
-  extends StateRowPrefixEncoder(
-    useColumnFamilies,
-    columnFamilyInfo
-  ) with RocksDBKeyStateEncoder with Logging {
+    useColumnFamilies: Boolean = false)
+  extends RocksDBKeyStateEncoder with Logging {
 
   override def encodeKey(row: UnsafeRow): Array[Byte] = {
     if (!useColumnFamilies) {

Review Comment:
   I was in doubt why useColumnFamilies is needed for KeyStateEncoder while we 
move virtual column family out from encoder... and found this possible bug.
   
   My understanding is, in else statement (useColumnFamilies = true), we write 
version byte "twice", because  `dataEncoder.encodeKey(row)` put version byte 
already. If this is a bug and you just need to call 
`dataEncoder.encodeKey(row)`, `useColumnFamilies` isn't used for any 
implementation of KeyStateEncoder and you can remove it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -467,29 +492,62 @@ class RocksDB(
     this
   }
 
+  /**
+   * Function to check if col family is internal or not based on information 
recorded in
+   * checkpoint metadata.
+   * @param cfName - column family name
+   * @param metadata - checkpoint metadata
+   * @return - type of column family (internal or otherwise)
+   */
+  private def checkColFamilyType(

Review Comment:
   nit: since this is returning Boolean, it'd be ideal to name it as isXXX so 
that people can understand which true will map to and for false. Just 2 cents 
and OK to leave it as it is if the name is not clear in this way.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -918,18 +968,34 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
       }
       // Check that snapshots and changelogs get purged correctly.
       db.doMaintenance()
-      assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+
+      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {

Review Comment:
   nit: isn't it duplicated with the next if statement?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -629,7 +627,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
 
       if (isChangelogCheckpointingEnabled) {
         assert(changelogVersionsPresent(remoteDir) === (1 to 50))
-        assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 
5))
+        if (colFamiliesEnabled) {
+          assert(snapshotVersionsPresent(remoteDir) ===
+            Seq(1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50))

Review Comment:
   Wait, we should have set the flag on enforcing snapshot for change of column 
family before this fix. Does this mean we had a bug?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -621,28 +700,106 @@ class RocksDB(
     }
   }
 
+  /**
+   * Function to encode state row with virtual col family id prefix
+   * @param data - passed byte array to be stored in state store
+   * @param cfName - name of column family
+   * @return - encoded byte array with virtual column family id prefix
+   */
+  private def encodeStateRowWithPrefix(
+      data: Array[Byte],
+      cfName: String): Array[Byte] = {
+    // Create result array big enough for all prefixes plus data
+    val result = new Array[Byte](StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES + 
data.length)
+    val offset = Platform.BYTE_ARRAY_OFFSET + 
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES
+
+    val cfInfo = getColumnFamilyInfo(cfName)
+    Platform.putShort(result, Platform.BYTE_ARRAY_OFFSET, cfInfo.cfId)
+
+    // Write the actual data
+    Platform.copyMemory(
+      data, Platform.BYTE_ARRAY_OFFSET,
+      result, offset,
+      data.length
+    )
+
+    result
+  }
+
+  /**
+   * Function to decode state row with virtual col family id prefix
+   * @param data - passed byte array retrieved from state store
+   * @return - pair of decoded byte array without virtual column family id 
prefix
+   *           and name of column family
+   */
+  private def decodeStateRowWithPrefix(data: Array[Byte]): (Array[Byte], 
String) = {
+    val cfId = Platform.getShort(data, Platform.BYTE_ARRAY_OFFSET)
+    val cfName = getColumnFamilyNameForId(cfId)
+    val offset = Platform.BYTE_ARRAY_OFFSET + 
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES
+
+    val key = new Array[Byte](data.length - 
StateStore.VIRTUAL_COL_FAMILY_PREFIX_BYTES)
+    Platform.copyMemory(
+      data, offset,
+      key, Platform.BYTE_ARRAY_OFFSET,
+      key.length
+    )
+
+    (key, cfName)
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
    */
-  def get(key: Array[Byte]): Array[Byte] = {
-    db.get(readOptions, key)
+  def get(
+      key: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    db.get(readOptions, keyWithPrefix)
   }
 
   /**
    * Put the given value for the given key.
    * @note This update is not committed to disk until commit() is called.
    */
-  def put(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def put(
+      key: Array[Byte],
+      value: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {

Review Comment:
   nit: This is same as merge, extract?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -178,55 +183,77 @@ class RocksDB(
   // This is accessed and updated only between load and commit
   // which means it is implicitly guarded by acquireLock
   @GuardedBy("acquireLock")
-  private val colFamilyNameToIdMap = new ConcurrentHashMap[String, Short]()
+  private val colFamilyNameToInfoMap = new ConcurrentHashMap[String, 
ColumnFamilyInfo]()
+
+  @GuardedBy("acquireLock")
+  private val colFamilyIdToNameMap = new ConcurrentHashMap[Short, String]()
 
   @GuardedBy("acquireLock")
   private val maxColumnFamilyId: AtomicInteger = new AtomicInteger(-1)
 
   @GuardedBy("acquireLock")
   private val shouldForceSnapshot: AtomicBoolean = new AtomicBoolean(false)
 
-  /**
-   * Check whether the column family name is for internal column families.
-   *
-   * @param cfName - column family name
-   * @return - true if the column family is for internal use, false otherwise
-   */
-  private def checkInternalColumnFamilies(cfName: String): Boolean = 
cfName.charAt(0) == '_'
+  private def getColumnFamilyInfo(cfName: String): ColumnFamilyInfo = {
+    colFamilyNameToInfoMap.get(cfName)
+  }
+
+  private def getColumnFamilyNameForId(cfId: Short): String = {
+    colFamilyIdToNameMap.get(cfId)
+  }
 
-  // Methods to fetch column family mapping for this State Store version
-  def getColumnFamilyMapping: Map[String, Short] = {
-    colFamilyNameToIdMap.asScala
+  private def addToColFamilyMaps(cfName: String, cfId: Short, isInternal: 
Boolean): Unit = {
+    colFamilyNameToInfoMap.putIfAbsent(cfName, ColumnFamilyInfo(cfId, 
isInternal))
+    colFamilyIdToNameMap.putIfAbsent(cfId, cfName)
+  }
+
+  private def removeFromColFamilyMaps(cfName: String): Unit = {
+    val colFamilyInfo = colFamilyNameToInfoMap.get(cfName)
+    if (colFamilyInfo != null) {
+      colFamilyNameToInfoMap.remove(cfName)
+      colFamilyIdToNameMap.remove(colFamilyInfo.cfId)
+    }
   }
 
-  def getColumnFamilyId(cfName: String): Short = {
-    colFamilyNameToIdMap.get(cfName)
+  private def clearColFamilyMaps(): Unit = {
+    colFamilyNameToInfoMap.clear()
+    colFamilyIdToNameMap.clear()
   }
 
   /**
-   * Create RocksDB column family, if not created already
+   * Check if the column family exists with given name and create one if it 
doesn't. Users can
+   * create external column families storing user facing data as well as 
internal column families
+   * such as secondary indexes. Metrics for both of these types are tracked 
separately.
+   *
+   * @param colFamilyName - column family name
+   * @param isInternal - whether the column family is for internal use or not
+   * @return - virtual column family id
    */
-  def createColFamilyIfAbsent(colFamilyName: String): Short = {
+  def createColFamilyIfAbsent(colFamilyName: String, isInternal: Boolean): 
Short = {
     if (!checkColFamilyExists(colFamilyName)) {
       val newColumnFamilyId = maxColumnFamilyId.incrementAndGet().toShort
-      colFamilyNameToIdMap.putIfAbsent(colFamilyName, newColumnFamilyId)
+      addToColFamilyMaps(colFamilyName, newColumnFamilyId, isInternal)
       shouldForceSnapshot.set(true)
       newColumnFamilyId
     } else {
-      colFamilyNameToIdMap.get(colFamilyName)
+      colFamilyNameToInfoMap.get(colFamilyName).cfId
     }
   }
 
   /**
    * Remove RocksDB column family, if exists
    * @return columnFamilyId if it exists, else None
    */
-  def removeColFamilyIfExists(colFamilyName: String): Option[Short] = {
+  def removeColFamilyIfExists(colFamilyName: String): Boolean = {
     if (checkColFamilyExists(colFamilyName)) {
       shouldForceSnapshot.set(true)
-      Some(colFamilyNameToIdMap.remove(colFamilyName))
+      prefixScan(Array.empty[Byte], colFamilyName).foreach { kv =>

Review Comment:
   Likewise I commented in above, `iterator(colFamilyName)` would be lot easier 
to understand (higher level of operation).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -381,9 +381,7 @@ class RocksDBStateEncoderSuite extends SparkFunSuite {
       keyStateEncoderSpec,
       valueSchema,
       Some(testProvider),
-      Some(ColumnFamilyInfo(StateStore.DEFAULT_COL_FAMILY_NAME, 0))
-    )
-    new AvroStateEncoder(keyStateEncoderSpec, valueSchema, None, None)

Review Comment:
   Nice finding!



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -629,7 +627,12 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
 
       if (isChangelogCheckpointingEnabled) {
         assert(changelogVersionsPresent(remoteDir) === (1 to 50))
-        assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 
5))
+        if (colFamiliesEnabled) {
+          assert(snapshotVersionsPresent(remoteDir) ===
+            Seq(1, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50))

Review Comment:
   nit: Probably easier to see the diff with `Seq(1) ++ Range.inclusive(5, 10, 
5)`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -700,7 +901,13 @@ class RocksDB(
     new NextIterator[ByteArrayPair] {
       override protected def getNext(): ByteArrayPair = {
         if (iter.isValid) {
-          byteArrayPair.set(iter.key, iter.value)
+          val key = if (useColumnFamilies) {
+            decodeStateRowWithPrefix(iter.key)._1

Review Comment:
   This concerns me a lot, as not giving the column family ID/name back to the 
caller makes the outcome be useless for multiple column family case. Say, 
column family cfA and cfB somehow have the same key keyA. This doesn't even 
give any meaning to global scan.
   
   That's why I commented like the above.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -1074,14 +1140,25 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.load(version, versionToUniqueId.get(version))
         assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
       }
+
       for (version <- 31 to 60) {
         db.load(version - 1, versionToUniqueId.get(version - 1))
         db.put(version.toString, version.toString)
         db.remove((version - 1).toString)
         db.commit()
       }
       assert(changelogVersionsPresent(remoteDir) === (1 to 30))
-      assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
+
+      var result: Seq[Long] = if (colFamiliesEnabled) {
+        Seq(1)
+      } else {
+        Seq.empty
+      }
+
+      (31 to 60).foreach { i =>

Review Comment:
   nit: doesn't this work? `result ++ (31 to 60)`



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -902,7 +946,13 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
         db.remove((version - 1).toString)
         db.commit()
       }
-      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+
+      if (enableStateStoreCheckpointIds && colFamiliesEnabled) {
+        assert(snapshotVersionsPresent(remoteDir) === (1 to 30) :+ 30 :+ 31)

Review Comment:
   Is this because 30 is executed two times and snapshot does not overwrite in 
checkpoint v2?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -702,6 +696,8 @@ object StateStore extends Logging {
 
   val DEFAULT_COL_FAMILY_NAME = "default"
 
+  val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2

Review Comment:
   This is bound to only of StateStore "implementation" 
(RocksDBStateStoreProvider), not used in StateStore interface and the base 
implementation. 
   
   Move this to RocksDBStateStoreProvider or some utility class which is 
referenced by both RocksDB and RocksDBStateStoreProvider.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -656,31 +803,75 @@ class RocksDB(
    *
    * @note This update is not committed to disk until commit() is called.
    */
-  def merge(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def merge(
+      key: Array[Byte],
+      value: Array[Byte],
+      cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          val cfInfo = getColumnFamilyInfo(cfName)
+          if (cfInfo.isInternal) {
+            numInternalKeysOnWritingVersion += 1
+          } else {
+            numKeysOnWritingVersion += 1
+          }
+        }
+      }
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
       }
     }
-    db.merge(writeOptions, key, value)
 
-    changelogWriter.foreach(_.merge(key, value))
+    db.merge(writeOptions, keyWithPrefix, value)
+    changelogWriter.foreach(_.merge(keyWithPrefix, value))
   }
 
   /**
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(key: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val value = db.get(readOptions, key)
-      if (value != null) {
-        numKeysOnWritingVersion -= 1
+  def remove(key: Array[Byte], cfName: String = 
StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    val keyWithPrefix = if (useColumnFamilies) {
+      encodeStateRowWithPrefix(key, cfName)
+    } else {
+      key
+    }
+
+    if (useColumnFamilies) {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, keyWithPrefix)
+        if (oldValue != null) {
+          val cfInfo = getColumnFamilyInfo(cfName)
+          if (cfInfo.isInternal) {
+            numInternalKeysOnWritingVersion -= 1
+          } else {
+            numKeysOnWritingVersion -= 1
+          }
+        }
+      }
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val value = db.get(readOptions, keyWithPrefix)
+        if (value != null) {
+          numKeysOnWritingVersion -= 1
+        }
       }
     }
-    db.delete(writeOptions, key)
-    changelogWriter.foreach(_.delete(key))
+
+    db.delete(writeOptions, keyWithPrefix)
+    changelogWriter.foreach(_.delete(keyWithPrefix))
   }
 
   /**

Review Comment:
   I'm saying that UX isn't changed quite differently for all other methods 
except this. Most methods are changed to be aware of column family, while this 
isn't.
   
   For multiple column families case, we do not expect global scan for the 
entire key space, so this method is mostly unused. They are now having to use 
`prefixScan()` when they were using `iterator()`, which relies on 
implementation details how we construct virtual column family. I'd rather say, 
it'd be better to have column family aware version of `iterator()`. 
   
   I'd leave it to you to think which is better 1) having both `iterator()` and 
`iterator(cfName: String)`, and leave `iterator()` to perform full scan 
regardless of multiple column families, 2) only having `iterator(cfName: 
String)`, and whether to full scan or just delegate to `prefixScan(cfName)` 
based on internal flag (multiple column families).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to