sahnib commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1431766707


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -604,17 +734,56 @@ class RocksDB(
   }
 
   private def getDBProperty(property: String): Long = {
-    db.getProperty(property).toLong
+    if (useColumnFamilies) {
+      // get cumulative sum across all available column families
+      assert(!colFamilyNameToHandleMap.isEmpty)
+      colFamilyNameToHandleMap
+        .values
+        .map(handle => db.getProperty(handle, property).toLong)
+        .sum
+    } else {
+      db.getProperty(property).toLong
+    }
   }
 
   private def openDB(): Unit = {
     assert(db == null)
-    db = NativeRocksDB.open(dbOptions, workingDir.toString)
-    logInfo(s"Opened DB with conf ${conf}")
+    if (useColumnFamilies) {
+      val colFamilies = NativeRocksDB.listColumnFamilies(dbOptions, 
workingDir.toString)
+
+      var colFamilyDescriptors: Seq[ColumnFamilyDescriptor] = 
Seq.empty[ColumnFamilyDescriptor]
+      // populate the list of available col family descriptors
+      colFamilies.asScala.toList.foreach { family =>
+        val descriptor = new ColumnFamilyDescriptor(family, 
columnFamilyOptions)
+        colFamilyDescriptors = colFamilyDescriptors :+ descriptor

Review Comment:
   Is `:+` O(1) operation on `Seq`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+
+import org.apache.commons.lang3.SerializationUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types._
+
+/**
+ * Class that provides a concrete implementation for a single value state 
associated with state
+ * variables used in the streaming transformWithState operator.
+ * @param store - reference to the StateStore instance to be used for storing 
state
+ * @param stateName - name of logical state partition
+ * @tparam S - data type of object that will be stored
+ */
+class ValueStateImpl[S](
+    store: StateStore,
+    stateName: String) extends ValueState[S] with Logging {
+
+  // TODO: validate places that are trying to encode the key and check if we 
can eliminate/
+  // add caching for some of these calls.
+  private def encodeKey(): UnsafeRow = {
+    val keyOption = ImplicitKeyTracker.getImplicitKeyOption
+    if (!keyOption.isDefined) {
+      throw new UnsupportedOperationException("Implicit key not found for 
operation on" +
+        s"stateName=$stateName")
+    }
+
+    val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+    val keyByteArr = 
SerializationUtils.serialize(keyOption.get.asInstanceOf[Serializable])
+    val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+    val keyRow = keyEncoder(InternalRow(keyByteArr))
+    keyRow
+  }
+
+  private def encodeValue(value: S): UnsafeRow = {
+    val schemaForValueRow: StructType = new StructType().add("value", 
BinaryType)
+    val valueByteArr = 
SerializationUtils.serialize(value.asInstanceOf[Serializable])
+    val valueEncoder = UnsafeProjection.create(schemaForValueRow)
+    val valueRow = valueEncoder(InternalRow(valueByteArr))
+    valueRow
+  }
+
+  /** Function to check if state exists. Returns true if present and false 
otherwise */
+  override def exists(): Boolean = {
+    getImpl() != null
+  }
+
+  /** Function to return Option of value if exists and None otherwise */
+  override def getOption(): Option[S] = {
+    val retRow = getImpl()
+    if (retRow != null) {
+      val resState = SerializationUtils
+        .deserialize(retRow.getBinary(0))
+        .asInstanceOf[S]

Review Comment:
   Just FYI, in List State PR I extracted out the key/value encoding to a 
separate object for ease of use across different state types.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -153,12 +265,70 @@ class StateStoreChangelogReader(
       val valueSize = input.readInt()
       if (valueSize < 0) {
         // A deletion record
-        (keyBuffer, null)
+        (RecordType.DELETE_RECORD, keyBuffer, null, 
StateStore.DEFAULT_COL_FAMILY_NAME)
       } else {
         val valueBuffer = new Array[Byte](valueSize)
         ByteStreams.readFully(input, valueBuffer, 0, valueSize)
         // A put record.
-        (keyBuffer, valueBuffer)
+        (RecordType.PUT_RECORD, keyBuffer, valueBuffer, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+      }
+    }
+  }
+}
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(recordType: RecordType.Value,
+ *  key: Array[Byte], value: Array[Byte], colFamilyName: String)
+ * A put record is returned as a ByteArrayPair(recordType, key, value, 
colFamilyName)
+ * A delete record is return as a ByteArrayPair(recordType, key, null, 
colFamilyName)
+ */
+class StateStoreChangelogReaderV2(
+    fm: CheckpointFileManager,
+    fileToRead: Path,
+    compressionCodec: CompressionCodec)
+  extends StateStoreChangelogReader(fm, fileToRead, compressionCodec) {
+
+  private def parseBuffer(input: DataInputStream): Array[Byte] = {
+    val blockSize = input.readInt()
+    val blockBuffer = new Array[Byte](blockSize)
+    ByteStreams.readFully(input, blockBuffer, 0, blockSize)
+    blockBuffer
+  }
+
+  override def getNext(): (RecordType.Value, Array[Byte], Array[Byte], String) 
= {
+    val recordTypeSize = input.readInt()
+    // A -1 key size mean end of file.
+    if (recordTypeSize == -1) {
+      finished = true
+      null
+    } else if (recordTypeSize < 0) {
+      throw new IOException(
+        s"Error reading streaming state file $fileToRead: " +
+        s"record type size cannot be $recordTypeSize")
+    } else {
+      val recordTypeBuffer = new Array[Byte](recordTypeSize)
+      ByteStreams.readFully(input, recordTypeBuffer, 0, recordTypeSize)

Review Comment:
   Should we use org.apache.spark.unsafe.Platform.copyMemory to copy these 
bytes? Not sure what ByteStreams does under the hood, but unsafe copy probably 
would be more efficient.  



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -604,17 +734,56 @@ class RocksDB(
   }
 
   private def getDBProperty(property: String): Long = {
-    db.getProperty(property).toLong
+    if (useColumnFamilies) {
+      // get cumulative sum across all available column families
+      assert(!colFamilyNameToHandleMap.isEmpty)
+      colFamilyNameToHandleMap
+        .values
+        .map(handle => db.getProperty(handle, property).toLong)
+        .sum

Review Comment:
   Curious, are all RocksDB properties additive across column families? (I 
guess sstables, and stats are - but I have not looked at exhaustive list). 



##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import java.io.Serializable
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+/**
+ * Represents the operation handle provided to the stateful processor used in 
the
+ * arbitrary state API v2.
+ */
+@Experimental
+@Evolving
+trait StatefulProcessorHandle extends Serializable {
+
+  /**
+   * Function to create new or return existing single value state variable of 
given type
+   * The user must ensure to call this function only within the `init()` 
method of the
+   * StatefulProcessor.
+   * @param stateName - name of the state variable
+   * @tparam T - type of state variable
+   * @return - instance of ValueState of type T that can be used to store 
state persistently
+   */
+  def getValueState[T](stateName: String): ValueState[T]
+
+  /** Function to return queryInfo for currently running task */
+  def getQueryInfo(): QueryInfo

Review Comment:
   How do we expect users to use the QueryInfo? Do we need to expose this in 
the processor handle? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -50,13 +51,15 @@ import org.apache.spark.util.{NextIterator, Utils}
  * @param localRootDir Root directory in local disk that is used to working 
and checkpointing dirs
  * @param hadoopConf   Hadoop configuration for talking to the remote file 
system
  * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ * @param useColumnFamilies Used to determine whether a single or multiple 
column families are used
  */
 class RocksDB(
     dfsRootDir: String,
     val conf: RocksDBConf,
     localRootDir: File = Utils.createTempDir(),
     hadoopConf: Configuration = new Configuration,
-    loggingId: String = "") extends Logging {
+    loggingId: String = "",
+    useColumnFamilies: Boolean = false) extends Logging {

Review Comment:
   Given that we don't use column families in Spark today, and every get/put 
operation defaults to ColumnFamily `DEFAULT`, do we need the additional 
`useColumnFamilies` guard? Not having the guard prevents a bunch of additional 
validation across changelog and RocksDB, so I am curious if its absolutely 
necessary to have this flag.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -204,12 +211,18 @@ class RocksDB(
     for (v <- loadedVersion + 1 to endVersion) {
       var changelogReader: StateStoreChangelogReader = null
       try {
-        changelogReader = fileManager.getChangelogReader(v)
-        changelogReader.foreach { case (key, value) =>
-          if (value != null) {
-            put(key, value)
-          } else {
-            remove(key)
+        changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
+        changelogReader.foreach { case (recordType, key, value, colFamilyName) 
=>

Review Comment:
   Do we write the actual recordType string in changelog file (`put_record', 
'delete_record`)? If yes, can we encode it to an Int (cons: mapping order 
matters) to save some I/O?  



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -604,17 +734,56 @@ class RocksDB(
   }
 
   private def getDBProperty(property: String): Long = {
-    db.getProperty(property).toLong
+    if (useColumnFamilies) {
+      // get cumulative sum across all available column families
+      assert(!colFamilyNameToHandleMap.isEmpty)
+      colFamilyNameToHandleMap
+        .values
+        .map(handle => db.getProperty(handle, property).toLong)
+        .sum
+    } else {
+      db.getProperty(property).toLong
+    }
   }
 
   private def openDB(): Unit = {
     assert(db == null)
-    db = NativeRocksDB.open(dbOptions, workingDir.toString)
-    logInfo(s"Opened DB with conf ${conf}")
+    if (useColumnFamilies) {
+      val colFamilies = NativeRocksDB.listColumnFamilies(dbOptions, 
workingDir.toString)
+
+      var colFamilyDescriptors: Seq[ColumnFamilyDescriptor] = 
Seq.empty[ColumnFamilyDescriptor]
+      // populate the list of available col family descriptors
+      colFamilies.asScala.toList.foreach { family =>
+        val descriptor = new ColumnFamilyDescriptor(family, 
columnFamilyOptions)
+        colFamilyDescriptors = colFamilyDescriptors :+ descriptor
+      }
+
+      if (colFamilyDescriptors.isEmpty) {
+        colFamilyDescriptors = colFamilyDescriptors :+
+          new ColumnFamilyDescriptor(NativeRocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions)

Review Comment:
   If column families are present, we do not need the `Default` handle anymore? 
No one would ever query/modify anything against the Default handle? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -78,9 +78,9 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
     override def id: StateStoreId = 
HDFSBackedStateStoreProvider.this.stateStoreId
 
-    override def get(key: UnsafeRow): UnsafeRow = map.get(key)
+    override def get(key: UnsafeRow, colFamilyName: String): UnsafeRow = 
map.get(key)
 
-    override def iterator(): Iterator[UnsafeRowPair] = {
+    override def iterator(colFamilyName: String): Iterator[UnsafeRowPair] = {

Review Comment:
   I guess we never expect `HDFSBackedStateStoreProvider`'s get or iterator to 
be called with a colFamilyName as non-empty, but should we assert that 
colFamilyName is empty (as technically its not supported)?  



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+
+import org.apache.commons.lang3.SerializationUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types._
+
+/**
+ * Class that provides a concrete implementation for a single value state 
associated with state
+ * variables used in the streaming transformWithState operator.
+ * @param store - reference to the StateStore instance to be used for storing 
state
+ * @param stateName - name of logical state partition
+ * @tparam S - data type of object that will be stored
+ */
+class ValueStateImpl[S](
+    store: StateStore,
+    stateName: String) extends ValueState[S] with Logging {
+
+  // TODO: validate places that are trying to encode the key and check if we 
can eliminate/
+  // add caching for some of these calls.
+  private def encodeKey(): UnsafeRow = {
+    val keyOption = ImplicitKeyTracker.getImplicitKeyOption
+    if (!keyOption.isDefined) {
+      throw new UnsupportedOperationException("Implicit key not found for 
operation on" +
+        s"stateName=$stateName")
+    }
+
+    val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+    val keyByteArr = 
SerializationUtils.serialize(keyOption.get.asInstanceOf[Serializable])
+    val keyEncoder = UnsafeProjection.create(schemaForKeyRow)
+    val keyRow = keyEncoder(InternalRow(keyByteArr))
+    keyRow
+  }
+
+  private def encodeValue(value: S): UnsafeRow = {
+    val schemaForValueRow: StructType = new StructType().add("value", 
BinaryType)
+    val valueByteArr = 
SerializationUtils.serialize(value.asInstanceOf[Serializable])
+    val valueEncoder = UnsafeProjection.create(schemaForValueRow)
+    val valueRow = valueEncoder(InternalRow(valueByteArr))
+    valueRow
+  }
+
+  /** Function to check if state exists. Returns true if present and false 
otherwise */
+  override def exists(): Boolean = {
+    getImpl() != null
+  }
+
+  /** Function to return Option of value if exists and None otherwise */
+  override def getOption(): Option[S] = {
+    val retRow = getImpl()
+    if (retRow != null) {
+      val resState = SerializationUtils
+        .deserialize(retRow.getBinary(0))
+        .asInstanceOf[S]
+      Some(resState)
+    } else {
+      None
+    }
+  }
+
+  /** Function to return associated value with key if exists and null 
otherwise */
+  override def get(): S = {
+    val retRow = getImpl()
+    if (retRow != null) {
+      val resState = SerializationUtils
+        .deserialize(retRow.getBinary(0))
+        .asInstanceOf[S]
+      resState
+    } else {
+      null.asInstanceOf[S]

Review Comment:
   [nit] do we need `asInstanceOf` here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -219,49 +233,127 @@ class RocksDB(
     loadedVersion = endVersion
   }
 
+  private def checkColFamilyExists(colFamilyName: String): Boolean = {
+    colFamilyNameToHandleMap.contains(colFamilyName)
+  }
+
+  /**
+   * Create RocksDB column family, if not created already
+   */
+  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+      throw new UnsupportedOperationException("Failed to create column family 
with reserved " +
+        s"name=$colFamilyName")
+    }
+
+    if (!checkColFamilyExists(colFamilyName)) {
+      assert(db != null)
+      val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, 
columnFamilyOptions)
+      val handle = db.createColumnFamily(descriptor)
+      colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
+    }
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
    */
-  def get(key: Array[Byte]): Array[Byte] = {
-    db.get(readOptions, key)
+  def get(
+      key: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] 
= {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+      db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
+    } else {
+      db.get(readOptions, key)
+    }
   }
 
   /**
    * Put the given value for the given key.
    * @note This update is not committed to disk until commit() is called.
    */
-  def put(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def put(
+      key: Array[Byte],
+      value: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
+      }
+      db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+      changelogWriter.foreach(_.put(key, value, colFamilyName))
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }

Review Comment:
   Addition of column family ends up in similar code across both `if` and 
`else` branches. One way to avoid duplication would be to add additional 
`doGet`, `doPut`, `writeChangelog` operations which would check for column 
family handle and call the appropriate RocksDB operation. (cons: It adds 
additional if..else checks in the same code-base, as an example in this code - 
we would validate 3 times instead of 1). 
   
   Do you think its worth the additional validation overhead for code 
simplification here? 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -109,18 +119,94 @@ class StateStoreChangelogWriter(
   }
 }
 
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV1(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+    extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  override def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+}
 
 /**
- * Read an iterator of change record from the changelog file.
- * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
- * A put record is returned as a ByteArrayPair(key, value)
- * A delete record is return as a ByteArrayPair(key, null)
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | record type | key length
+ *    | key content | value length | value content | col family name length | 
col family name | -1 |
+ * A delete record is written as: | record type | key length | key content | -1
+ *    | col family name length | col family name | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV2(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+    extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte], colFamilyName: 
String): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(RecordType.PUT_RECORD.toString.getBytes.size)

Review Comment:
   +1 to numeric code. I gave the same comment above, just doing +1 here.  



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -204,12 +211,18 @@ class RocksDB(
     for (v <- loadedVersion + 1 to endVersion) {
       var changelogReader: StateStoreChangelogReader = null
       try {
-        changelogReader = fileManager.getChangelogReader(v)
-        changelogReader.foreach { case (key, value) =>
-          if (value != null) {
-            put(key, value)
-          } else {
-            remove(key)
+        changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
+        changelogReader.foreach { case (recordType, key, value, colFamilyName) 
=>

Review Comment:
   Looked further downstream, and seems like we write the entire string. 
   
   ```
       compressedStream.writeInt(RecordType.PUT_RECORD.toString.getBytes.size)
       compressedStream.write(RecordType.PUT_RECORD.toString.getBytes)
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to