HeartSaVioR commented on code in PR #43961:
URL: https://github.com/apache/spark/pull/43961#discussion_r1430858354


##########
sql/api/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TransformWithStateTimeoutModes.scala:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.streaming.TimeoutMode
+
+/** Types of timeouts used in tranformWithState operator */
+case object noTimeouts extends TimeoutMode

Review Comment:
   nit: CamelCase



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -719,6 +719,36 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     }
   }
 
+  /**
+   * Strategy to convert [[TransformWithState]] logical operator to physical 
operator
+   * in streaming plans.
+   */
+   object TransformWithStateStrategy extends Strategy {
+    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {

Review Comment:
   nit: 2 spaces (looks to be 1 space)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.streaming.{QueryInfo, StatefulProcessorHandle, 
ValueState}
+import org.apache.spark.util.Utils
+
+/**
+ * Object used to assign/retrieve/remove grouping key passed implicitly for 
various state
+ * manipulation actions using the store handle.
+ */
+object ImplicitKeyTracker {
+  val implicitKey: InheritableThreadLocal[Any] = new 
InheritableThreadLocal[Any]
+
+  def getImplicitKeyOption: Option[Any] = Option(implicitKey.get())
+
+  def setImplicitKey(key: Any): Unit = implicitKey.set(key)
+
+  def removeImplicitKey(): Unit = implicitKey.remove()
+}
+
+/**
+ * Enum used to track valid states for the StatefulProcessorHandle
+ */
+object StatefulProcessorHandleState extends Enumeration {
+  type StatefulProcessorHandleState = Value
+  val CREATED, INITIALIZED, DATA_PROCESSED, CLOSED = Value
+}
+
+class QueryInfoImpl(
+    val queryId: UUID,
+    val runId: UUID,
+    val batchId: Long,
+    val operatorId: Long,
+    val partitionId: Int) extends QueryInfo {
+
+  override def getQueryId: UUID = queryId
+
+  override def getRunId: UUID = runId
+
+  override def getBatchId: Long = batchId
+
+  override def getOperatorId: Long = operatorId
+
+  override def getPartitionId: Int = partitionId
+
+  override def toString: String = {
+    s"QueryInfo(queryId=$queryId, runId=$runId, batchId=$batchId, 
operatorId=$operatorId, " +
+      s"partitionId=$partitionId)"
+  }
+}
+
+/**
+ * Class that provides a concrete implementation of a StatefulProcessorHandle. 
Note that we keep
+ * track of valid transitions as various functions are invoked to track object 
lifecycle.
+ * @param store - instance of state store
+ */
+class StatefulProcessorHandleImpl(store: StateStore, runId: UUID)

Review Comment:
   If we can pass over the params to constructor or so, let's do that. You can 
even add companion object with apply method to do the magic with TaskContext. 
Let's avoid the instance itself be tied to the thread.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -33,12 +33,20 @@ import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.Cancellabl
 import org.apache.spark.util.NextIterator
 
 /**
- * Write changes to the key value state store instance to a changelog file.
- * There are 2 types of records, put and delete.
- * A put record is written as: | key length | key content | value length | 
value content |
- * A delete record is written as: | key length | key content | -1 |
- * Write an Int -1 to signal the end of file.
- * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ * Enum used to write record types to changelog files used with 
RocksDBStateStoreProvider.
+ */
+object RecordType extends Enumeration {
+  type RecordType = Value
+
+  val PUT_RECORD = Value("put_record")

Review Comment:
   Maybe you no longer need to have string representation if we go with 1 byte 
marker.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with multiple col 
families") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val testColFamily1: String = "testColFamily1"
+    val testColFamily2: String = "testColFamily2"
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily1)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily1)
+
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily2)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily2)
+
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntriesForColFamily1 = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, testColFamily1)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with multiple col 
families") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val testColFamily1: String = "testColFamily1"
+    val testColFamily2: String = "testColFamily2"
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily1)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily1)
+
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily2)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily2)
+
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntriesForColFamily1 = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, testColFamily1)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, testColFamily1))
+
+    val expectedEntriesForColFamily2 = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, testColFamily2)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -896,63 +918,69 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   protected val keySchema: StructType = StateStoreTestsHelper.keySchema
   protected val valueSchema: StructType = StateStoreTestsHelper.valueSchema
 
-  testWithAllCodec("get, put, remove, commit, and all data iterator") {
-    tryWithProviderResource(newStoreProvider()) { provider =>
-      // Verify state before starting a new set of updates
-      assert(getLatestData(provider).isEmpty)
-
-      val store = provider.getStore(0)
-      assert(!store.hasCommitted)
-      assert(get(store, "a", 0) === None)
-      assert(store.iterator().isEmpty)
-      assert(store.metrics.numKeys === 0)
-
-      // Verify state after updating
-      put(store, "a", 0, 1)
-      assert(get(store, "a", 0) === Some(1))
-
-      assert(store.iterator().nonEmpty)
-      assert(getLatestData(provider).isEmpty)
-
-      // Make updates, commit and then verify state
-      put(store, "b", 0, 2)
-      put(store, "aa", 0, 3)
-      remove(store, _._1.startsWith("a"))
-      assert(store.commit() === 1)
-
-      assert(store.hasCommitted)
-      assert(rowPairsToDataSet(store.iterator()) === Set(("b", 0) -> 2))
-      assert(getLatestData(provider) === Set(("b", 0) -> 2))
-
-      // Trying to get newer versions should fail
-      var e = intercept[SparkException] {
-        provider.getStore(2)
-      }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
-
-      e = intercept[SparkException] {
-        getData(provider, 2)
-      }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
+  Seq(true, false).foreach { useColumnFamilies =>

Review Comment:
   Ideally we should figure out how to bring up test for matrix test, but let's 
put this aside for now.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with multiple col 
families") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val testColFamily1: String = "testColFamily1"
+    val testColFamily2: String = "testColFamily2"
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,

Review Comment:
   ditto for following 4 lines



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.execution.streaming.{ImplicitKeyTracker, 
StatefulProcessorHandleImpl}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests for single value ValueState types used in arbitrary 
stateful
+ * operators such as transformWithState
+ */
+class ValueStateSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def newStoreProviderWithValueState(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithValueState(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithValueState(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {
+    try {
+      f(provider)
+    } finally {
+      provider.close()
+    }
+  }
+
+  test("Implicit key operations") {
+    tryWithProviderResource(newStoreProviderWithValueState(true)) { provider =>
+      val store = provider.getStore(0)
+      val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID())
+      assert(handle.getQueryInfo().getPartitionId === 0)
+
+      val testState: ValueState[Long] = handle.getValueState[Long]("testState")
+      assert(ImplicitKeyTracker.getImplicitKeyOption.isEmpty)
+      val ex = intercept[Exception] {
+        testState.update(123)
+      }
+
+      assert(ex.isInstanceOf[UnsupportedOperationException])
+      assert(ex.getMessage.contains("Implicit key not found"))
+      ImplicitKeyTracker.setImplicitKey("test_key")
+      assert(ImplicitKeyTracker.getImplicitKeyOption.isDefined)
+      testState.update(123)

Review Comment:
   read back to verify the update?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.execution.streaming.{ImplicitKeyTracker, 
StatefulProcessorHandleImpl}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests for single value ValueState types used in arbitrary 
stateful
+ * operators such as transformWithState
+ */
+class ValueStateSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def newStoreProviderWithValueState(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithValueState(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithValueState(
+    storeId: StateStoreId,

Review Comment:
   nit: 4 spaces



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
+  with Logging {
+  @transient private var _countState: ValueState[Long] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(handle: StatefulProcessorHandle,
+    outputMode: OutputMode) : Unit = {
+    _processorHandle = handle
+    assert(handle.getQueryInfo().getBatchId >= 0)
+    assert(handle.getQueryInfo().getOperatorId == 0)
+    assert(handle.getQueryInfo().getPartitionId >= 0 && 
handle.getQueryInfo().getPartitionId < 5)
+    _countState = _processorHandle.getValueState[Long]("countState")
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[String],
+      timerValues: TimerValues): Iterator[(String, String)] = {
+    val count = _countState.getOption().getOrElse(0L) + inputRows.size
+    if (count == 3) {
+      _countState.remove()
+      Iterator.empty
+    } else {
+      _countState.update(count)
+      Iterator((key, count.toString))
+    }
+  }
+
+  override def close(): Unit = {}
+}
+
+class RunningCountStatefulProcessorWithError extends 
RunningCountStatefulProcessor {
+  @transient private var _tempState: ValueState[Long] = _
+
+  override def handleInputRows(key: String,

Review Comment:
   nit: follow multi-line style



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -109,18 +119,94 @@ class StateStoreChangelogWriter(
   }
 }
 
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV1(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+    extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  override def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+}
 
 /**
- * Read an iterator of change record from the changelog file.
- * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
- * A put record is returned as a ByteArrayPair(key, value)
- * A delete record is return as a ByteArrayPair(key, null)
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | record type | key length
+ *    | key content | value length | value content | col family name length | 
col family name | -1 |
+ * A delete record is written as: | record type | key length | key content | -1
+ *    | col family name length | col family name | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV2(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+    extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte], colFamilyName: 
String): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(RecordType.PUT_RECORD.toString.getBytes.size)

Review Comment:
   Discussed offline: decided to use a 1 byte marker.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -294,8 +297,8 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     // Now enable changelog checkpointing in a checkpoint created by a state 
store
     // that disable changelog checkpointing.
     val enableChangelogCheckpointingConf =
-      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 
30,
-        minDeltasForSnapshot = 1)
+    dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 30,

Review Comment:
   nit: prev. indentation seems more proper



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with multiple col 
families") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val testColFamily1: String = "testColFamily1"
+    val testColFamily2: String = "testColFamily2"
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily1)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily1)
+
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily2)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily2)
+
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntriesForColFamily1 = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, testColFamily1)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, testColFamily1))
+
+    val expectedEntriesForColFamily2 = (1 to 5).map(i =>

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>

Review Comment:
   nit: { for multi-line, `{ i =>`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -219,49 +233,127 @@ class RocksDB(
     loadedVersion = endVersion
   }
 
+  private def checkColFamilyExists(colFamilyName: String): Boolean = {
+    colFamilyNameToHandleMap.contains(colFamilyName)
+  }
+
+  /**
+   * Create RocksDB column family, if not created already
+   */
+  def createColFamilyIfAbsent(colFamilyName: String): Unit = {
+    if (colFamilyName == StateStore.DEFAULT_COL_FAMILY_NAME) {
+      throw new UnsupportedOperationException("Failed to create column family 
with reserved " +
+        s"name=$colFamilyName")
+    }
+
+    if (!checkColFamilyExists(colFamilyName)) {
+      assert(db != null)
+      val descriptor = new ColumnFamilyDescriptor(colFamilyName.getBytes, 
columnFamilyOptions)
+      val handle = db.createColumnFamily(descriptor)
+      colFamilyNameToHandleMap(handle.getName.map(_.toChar).mkString) = handle
+    }
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
    */
-  def get(key: Array[Byte]): Array[Byte] = {
-    db.get(readOptions, key)
+  def get(
+      key: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Array[Byte] 
= {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+      db.get(colFamilyNameToHandleMap(colFamilyName), readOptions, key)
+    } else {
+      db.get(readOptions, key)
+    }
   }
 
   /**
    * Put the given value for the given key.
    * @note This update is not committed to disk until commit() is called.
    */
-  def put(key: Array[Byte], value: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val oldValue = db.get(readOptions, key)
-      if (oldValue == null) {
-        numKeysOnWritingVersion += 1
+  def put(
+      key: Array[Byte],
+      value: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {
+        throw new RuntimeException(s"Column family with name=$colFamilyName 
does not exist")
+      }
+
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(colFamilyNameToHandleMap(colFamilyName), 
readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
+      }
+      db.put(colFamilyNameToHandleMap(colFamilyName), writeOptions, key, value)
+      changelogWriter.foreach(_.put(key, value, colFamilyName))
+    } else {
+      if (conf.trackTotalNumberOfRows) {
+        val oldValue = db.get(readOptions, key)
+        if (oldValue == null) {
+          numKeysOnWritingVersion += 1
+        }
       }
+      db.put(writeOptions, key, value)
+      changelogWriter.foreach(_.put(key, value))
     }
-    db.put(writeOptions, key, value)
-    changelogWriter.foreach(_.put(key, value))
   }
 
   /**
    * Remove the key if present.
    * @note This update is not committed to disk until commit() is called.
    */
-  def remove(key: Array[Byte]): Unit = {
-    if (conf.trackTotalNumberOfRows) {
-      val value = db.get(readOptions, key)
-      if (value != null) {
-        numKeysOnWritingVersion -= 1
+  def remove(
+      key: Array[Byte],
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Unit = {
+    if (useColumnFamilies) {
+      // if col family is not created, throw an exception
+      if (!checkColFamilyExists(colFamilyName)) {

Review Comment:
   Maybe have a new method starting with assert - we see lots of places doing 
the same check and throwing the same exception.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########


Review Comment:
   I'd rather define a new test method to do matrix test, for this case, 
something along the line, `testWithColumnFamilyEnabled()` to handle `func: 
Boolean => `. 
   
   If you do that, your change won't be significant as indentation won't be 
changed. Same with other test suites if applies.
   
   ```
     def executeFuncWithStateVersionSQLConf(
         stateVersion: Int,
         confPairs: Seq[(String, String)],
         func: => Any): Unit = {
       withSQLConf(confPairs ++
         Seq(SQLConf.STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key -> 
stateVersion.toString): _*) {
         func
       }
     }
   
     def testWithAllStateVersions(name: String, confPairs: (String, String)*)
                                 (func: => Any): Unit = {
       for (version <- StreamingAggregationStateManager.supportedVersions) {
         test(s"$name - state format version $version") {
           executeFuncWithStateVersionSQLConf(version, confPairs, func)
         }
       }
     }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -109,18 +119,93 @@ class StateStoreChangelogWriter(
   }
 }
 
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV1(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+  extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  override def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+}
 
 /**
- * Read an iterator of change record from the changelog file.
- * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
- * A put record is returned as a ByteArrayPair(key, value)
- * A delete record is return as a ByteArrayPair(key, null)
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | record type | key length
+ *    | key content | value length | value content | col family name length | 
col family name | -1 |
+ * A delete record is written as: | record type | key length | key content | -1
+ *    | col family name length | col family name | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriterV2(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec)
+  extends StateStoreChangelogWriter(fm, file, compressionCodec) {
+
+  override def put(key: Array[Byte], value: Array[Byte], colFamilyName: 
String): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(RecordType.PUT_RECORD.toString.getBytes.size)
+    compressedStream.write(RecordType.PUT_RECORD.toString.getBytes)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    compressedStream.writeInt(colFamilyName.getBytes.size)
+    compressedStream.write(colFamilyName.getBytes)
+    size += 1
+  }
+
+  override def delete(key: Array[Byte], colFamilyName: String): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(RecordType.DELETE_RECORD.toString.getBytes.size)
+    compressedStream.write(RecordType.DELETE_RECORD.toString.getBytes)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    compressedStream.writeInt(colFamilyName.getBytes.size)
+    compressedStream.write(colFamilyName.getBytes)
+    size += 1
+  }
+}
+
+/**
+ * Base class for state store changelog reader
+ * @param fm - checkpoint file manager used to manage streaming query 
checkpoint
+ * @param fileToRead - name of file to use to read changelog
+ * @param compressionCodec - de-compression method using for reading changelog 
file
  */
 class StateStoreChangelogReader(

Review Comment:
   Same, consider explicitly using abstract class with abstract method rather 
than opening a chance to use the wrong class.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -109,12 +109,12 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     "RocksDB: check changelog and snapshot version") {
     val remoteDir = Utils.createTempDir().toString
     val conf = dbConf.copy(minDeltasForSnapshot = 1)
-    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    new File(remoteDir).delete() // to make sure that the directory gets 
created
     for (version <- 0 to 49) {
       withDB(remoteDir, version = version, conf = conf) { db =>
-          db.put(version.toString, version.toString)
-          db.commit()
-          if ((version + 1) % 5 == 0) db.doMaintenance()
+        db.put(version.toString, version.toString)

Review Comment:
   nice catch on indentation!



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -377,119 +380,247 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
-  test("RocksDB: compression conf") {
-    val remoteDir = Utils.createTempDir().toString
-    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+  Seq(true, false).foreach { useColumnFamilies =>
+    test(s"RocksDB: compression conf with 
useColumnFamilies=$useColumnFamilies") {
+      val remoteDir = Utils.createTempDir().toString
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
 
-    val conf = RocksDBConf().copy(compression = "zstd")
-    withDB(remoteDir, conf = conf) { db =>
-      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.ZSTD_COMPRESSION)
-    }
+      val conf = RocksDBConf().copy(compression = "zstd")
+      withDB(remoteDir, conf = conf, useColumnFamilies = useColumnFamilies) { 
db =>
+        assert(db.columnFamilyOptions.compressionType() == 
CompressionType.ZSTD_COMPRESSION)
+      }
 
-    // Test the default is LZ4
-    withDB(remoteDir, conf = RocksDBConf().copy()) { db =>
-      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.LZ4_COMPRESSION)
+      // Test the default is LZ4
+      withDB(remoteDir, conf = RocksDBConf().copy(), useColumnFamilies = 
useColumnFamilies) { db =>
+        assert(db.columnFamilyOptions.compressionType() == 
CompressionType.LZ4_COMPRESSION)
+      }
     }
   }
 
-  test("RocksDB: get, put, iterator, commit, load") {
-    def testOps(compactOnCommit: Boolean): Unit = {
-      val remoteDir = Utils.createTempDir().toString
-      new File(remoteDir).delete()  // to make sure that the directory gets 
created
+  Seq(true, false).foreach { useColumnFamilies =>
+    test(s"RocksDB: get, put, iterator, commit, load with " +
+      s"useColumnFamilies=$useColumnFamilies") {
+      def testOps(compactOnCommit: Boolean): Unit = {
+        val remoteDir = Utils.createTempDir().toString
+        new File(remoteDir).delete() // to make sure that the directory gets 
created
 
-      val conf = RocksDBConf().copy(compactOnCommit = compactOnCommit)
-      withDB(remoteDir, conf = conf) { db =>
-        assert(db.get("a") === null)
-        assert(iterator(db).isEmpty)
+        val conf = RocksDBConf().copy(compactOnCommit = compactOnCommit)
+        withDB(remoteDir, conf = conf, useColumnFamilies = useColumnFamilies) 
{ db =>
+          assert(db.get("a") === null)
+          assert(iterator(db).isEmpty)
 
-        db.put("a", "1")
-        assert(toStr(db.get("a")) === "1")
-        db.commit()
-      }
+          db.put("a", "1")
+          assert(toStr(db.get("a")) === "1")
+          db.commit()
+        }
 
-      withDB(remoteDir, conf = conf, version = 0) { db =>
-        // version 0 can be loaded again
-        assert(toStr(db.get("a")) === null)
-        assert(iterator(db).isEmpty)
-      }
+        withDB(remoteDir, conf = conf, version = 0, useColumnFamilies = 
useColumnFamilies) { db =>
+          // version 0 can be loaded again
+          assert(toStr(db.get("a")) === null)
+          assert(iterator(db).isEmpty)
+        }
 
-      withDB(remoteDir, conf = conf, version = 1) { db =>
-        // version 1 data recovered correctly
-        assert(toStr(db.get("a")) === "1")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
+        withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = 
useColumnFamilies) { db =>
+          // version 1 data recovered correctly
+          assert(toStr(db.get("a")) === "1")
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
 
-        // make changes but do not commit version 2
-        db.put("b", "2")
-        assert(toStr(db.get("b")) === "2")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", "2")))
-      }
+          // make changes but do not commit version 2
+          db.put("b", "2")
+          assert(toStr(db.get("b")) === "2")
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", 
"2")))
+        }
 
-      withDB(remoteDir, conf = conf, version = 1) { db =>
-        // version 1 data not changed
-        assert(toStr(db.get("a")) === "1")
-        assert(db.get("b") === null)
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
+        withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = 
useColumnFamilies) { db =>
+          // version 1 data not changed
+          assert(toStr(db.get("a")) === "1")
+          assert(db.get("b") === null)
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
 
-        // commit version 2
-        db.put("b", "2")
-        assert(toStr(db.get("b")) === "2")
-        db.commit()
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", "2")))
-      }
+          // commit version 2
+          db.put("b", "2")
+          assert(toStr(db.get("b")) === "2")
+          db.commit()
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", 
"2")))
+        }
+
+        withDB(remoteDir, conf = conf, version = 1, useColumnFamilies = 
useColumnFamilies) { db =>
+          // version 1 data not changed
+          assert(toStr(db.get("a")) === "1")
+          assert(db.get("b") === null)
+        }
 
-      withDB(remoteDir, conf = conf, version = 1) { db =>
-        // version 1 data not changed
-        assert(toStr(db.get("a")) === "1")
-        assert(db.get("b") === null)
+        withDB(remoteDir, conf = conf, version = 2, useColumnFamilies = 
useColumnFamilies) { db =>
+          // version 2 can be loaded again
+          assert(toStr(db.get("b")) === "2")
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", 
"2")))
+
+          db.load(1)
+          assert(toStr(db.get("b")) === null)
+          assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
+        }
       }
 
-      withDB(remoteDir, conf = conf, version = 2) { db =>
-        // version 2 can be loaded again
-        assert(toStr(db.get("b")) === "2")
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1"), ("b", "2")))
+      for (compactOnCommit <- Seq(false, true)) {
+        withClue(s"compactOnCommit = $compactOnCommit") {
+          testOps(compactOnCommit)
+        }
+      }
+    }
+  }
 
-        db.load(1)
-        assert(toStr(db.get("b")) === null)
-        assert(db.iterator().map(toStr).toSet === Set(("a", "1")))
+  testWithChangelogCheckpointingEnabled(s"RocksDB: get, put, iterator, commit, 
laod " +

Review Comment:
   see comment above



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with multiple col 
families") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val testColFamily1: String = "testColFamily1"
+    val testColFamily2: String = "testColFamily2"
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily1)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily1)
+
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      testColFamily2)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, testColFamily2)
+
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1, true)
+    val entries = changelogReader.toSeq
+    val expectedEntriesForColFamily1 = (1 to 5).map(i =>

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -377,119 +380,247 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
-  test("RocksDB: compression conf") {
-    val remoteDir = Utils.createTempDir().toString
-    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+  Seq(true, false).foreach { useColumnFamilies =>
+    test(s"RocksDB: compression conf with 
useColumnFamilies=$useColumnFamilies") {
+      val remoteDir = Utils.createTempDir().toString
+      new File(remoteDir).delete() // to make sure that the directory gets 
created
 
-    val conf = RocksDBConf().copy(compression = "zstd")
-    withDB(remoteDir, conf = conf) { db =>
-      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.ZSTD_COMPRESSION)
-    }
+      val conf = RocksDBConf().copy(compression = "zstd")
+      withDB(remoteDir, conf = conf, useColumnFamilies = useColumnFamilies) { 
db =>
+        assert(db.columnFamilyOptions.compressionType() == 
CompressionType.ZSTD_COMPRESSION)
+      }
 
-    // Test the default is LZ4
-    withDB(remoteDir, conf = RocksDBConf().copy()) { db =>
-      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.LZ4_COMPRESSION)
+      // Test the default is LZ4
+      withDB(remoteDir, conf = RocksDBConf().copy(), useColumnFamilies = 
useColumnFamilies) { db =>
+        assert(db.columnFamilyOptions.compressionType() == 
CompressionType.LZ4_COMPRESSION)
+      }
     }
   }
 
-  test("RocksDB: get, put, iterator, commit, load") {
-    def testOps(compactOnCommit: Boolean): Unit = {
-      val remoteDir = Utils.createTempDir().toString
-      new File(remoteDir).delete()  // to make sure that the directory gets 
created
+  Seq(true, false).foreach { useColumnFamilies =>
+    test(s"RocksDB: get, put, iterator, commit, load with " +

Review Comment:
   AFAIK, this is tested with and without changelog checkpointing. See the 
definition of AlsoTestWithChangelogCheckpointingEnabled. 
   
   Or do you have something to additionally check for changelog checkpointing 
in below test case?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,

Review Comment:
   nit: Shall we make this be consistent? `(1 to 5).foreach { i => `



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -503,11 +634,79 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     changelogWriter.commit()
     val changelogReader = fileManager.getChangelogReader(1)
     val entries = changelogReader.toSeq
-    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
-      (2 to 4).map(j => (j.toString.getBytes, null))
+    val expectedEntries = (1 to 5).map(i =>
+      (RecordType.PUT_RECORD, i.toString.getBytes,
+        i.toString.getBytes, StateStore.DEFAULT_COL_FAMILY_NAME)) ++
+      (2 to 4).map(j => (RecordType.DELETE_RECORD, j.toString.getBytes,
+        null, StateStore.DEFAULT_COL_FAMILY_NAME))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2 &&
+        e1._3 === e2._3 && e1._4 === e2._4)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDBFileManager: read and write v2 changelog with default col family") 
{
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1, true)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString,
+      StateStore.DEFAULT_COL_FAMILY_NAME)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString, 
StateStore.DEFAULT_COL_FAMILY_NAME)

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -836,14 +850,16 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       dir: String = newDir(),
       minDeltasForSnapshot: Int = 
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
       numOfVersToRetainInMemory: Int = 
SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get,
-      hadoopConf: Configuration = new Configuration): 
HDFSBackedStateStoreProvider = {
+      hadoopConf: Configuration = new Configuration,
+      useColumnFamilies: Boolean = false): HDFSBackedStateStoreProvider = {

Review Comment:
   If this method does not override anything then I wouldn't add the parameter 
which won't be effective.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
+  with Logging {
+  @transient private var _countState: ValueState[Long] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(handle: StatefulProcessorHandle,

Review Comment:
   nit: let's follow multi-line style



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -785,6 +791,11 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     newStoreProvider(storeId.operatorId, storeId.partitionId, dir = 
storeId.checkpointRootLocation)
   }
 
+  override def newStoreProvider(storeId: StateStoreId,

Review Comment:
   nit: let's follow the multi-line style



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.execution.streaming.{ImplicitKeyTracker, 
StatefulProcessorHandleImpl}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests for single value ValueState types used in arbitrary 
stateful
+ * operators such as transformWithState
+ */
+class ValueStateSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def newStoreProviderWithValueState(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithValueState(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithValueState(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,
+    conf: Configuration = new Configuration,
+    useColumnFamilies: Boolean = false): RocksDBStateStoreProvider = {
+    val provider = new RocksDBStateStoreProvider()
+    provider.init(
+      storeId, schemaForKeyRow, schemaForValueRow, numColsPrefixKey = 
numColsPrefixKey,
+      useColumnFamilies,
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), conf)
+    provider
+  }
+
+  private def tryWithProviderResource[T](
+    provider: StateStoreProvider)(f: StateStoreProvider => T): T = {

Review Comment:
   nit: 4 spaces



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.execution.streaming.{ImplicitKeyTracker, 
StatefulProcessorHandleImpl}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Class that adds tests for single value ValueState types used in arbitrary 
stateful
+ * operators such as transformWithState
+ */
+class ValueStateSuite extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private def newStoreProviderWithValueState(useColumnFamilies: Boolean):
+    RocksDBStateStoreProvider = {
+    newStoreProviderWithValueState(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  private def newStoreProviderWithValueState(
+    storeId: StateStoreId,
+    numColsPrefixKey: Int,
+    sqlConf: Option[SQLConf] = None,

Review Comment:
   Would having `SQLConf.get` as default value work?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
+  with Logging {
+  @transient private var _countState: ValueState[Long] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(handle: StatefulProcessorHandle,
+    outputMode: OutputMode) : Unit = {
+    _processorHandle = handle
+    assert(handle.getQueryInfo().getBatchId >= 0)
+    assert(handle.getQueryInfo().getOperatorId == 0)
+    assert(handle.getQueryInfo().getPartitionId >= 0 && 
handle.getQueryInfo().getPartitionId < 5)

Review Comment:
   nit: shall we explicitly get the number of shuffle partitions rather than 
magic number?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, SaveMode}
+import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
 RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+
+class RunningCountStatefulProcessor extends StatefulProcessor[String, String, 
(String, String)]
+  with Logging {
+  @transient private var _countState: ValueState[Long] = _
+  @transient var _processorHandle: StatefulProcessorHandle = _
+
+  override def init(handle: StatefulProcessorHandle,
+    outputMode: OutputMode) : Unit = {
+    _processorHandle = handle
+    assert(handle.getQueryInfo().getBatchId >= 0)
+    assert(handle.getQueryInfo().getOperatorId == 0)
+    assert(handle.getQueryInfo().getPartitionId >= 0 && 
handle.getQueryInfo().getPartitionId < 5)
+    _countState = _processorHandle.getValueState[Long]("countState")
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[String],
+      timerValues: TimerValues): Iterator[(String, String)] = {
+    val count = _countState.getOption().getOrElse(0L) + inputRows.size
+    if (count == 3) {
+      _countState.remove()
+      Iterator.empty
+    } else {
+      _countState.update(count)
+      Iterator((key, count.toString))
+    }
+  }
+
+  override def close(): Unit = {}
+}
+
+class RunningCountStatefulProcessorWithError extends 
RunningCountStatefulProcessor {
+  @transient private var _tempState: ValueState[Long] = _
+
+  override def handleInputRows(key: String,
+    inputRows: Iterator[String],

Review Comment:
   nit: 4 spaces



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -204,12 +211,18 @@ class RocksDB(
     for (v <- loadedVersion + 1 to endVersion) {
       var changelogReader: StateStoreChangelogReader = null
       try {
-        changelogReader = fileManager.getChangelogReader(v)
-        changelogReader.foreach { case (key, value) =>
-          if (value != null) {
-            put(key, value)
-          } else {
-            remove(key)
+        changelogReader = fileManager.getChangelogReader(v, useColumnFamilies)
+        changelogReader.foreach { case (recordType, key, value, colFamilyName) 
=>

Review Comment:
   discussed offline: there could be several types in future but we cannot 
imagine the types to be exceed 1 byte.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -896,63 +918,69 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   protected val keySchema: StructType = StateStoreTestsHelper.keySchema
   protected val valueSchema: StructType = StateStoreTestsHelper.valueSchema
 
-  testWithAllCodec("get, put, remove, commit, and all data iterator") {
-    tryWithProviderResource(newStoreProvider()) { provider =>
-      // Verify state before starting a new set of updates
-      assert(getLatestData(provider).isEmpty)
-
-      val store = provider.getStore(0)
-      assert(!store.hasCommitted)
-      assert(get(store, "a", 0) === None)
-      assert(store.iterator().isEmpty)
-      assert(store.metrics.numKeys === 0)
-
-      // Verify state after updating
-      put(store, "a", 0, 1)
-      assert(get(store, "a", 0) === Some(1))
-
-      assert(store.iterator().nonEmpty)
-      assert(getLatestData(provider).isEmpty)
-
-      // Make updates, commit and then verify state
-      put(store, "b", 0, 2)
-      put(store, "aa", 0, 3)
-      remove(store, _._1.startsWith("a"))
-      assert(store.commit() === 1)
-
-      assert(store.hasCommitted)
-      assert(rowPairsToDataSet(store.iterator()) === Set(("b", 0) -> 2))
-      assert(getLatestData(provider) === Set(("b", 0) -> 2))
-
-      // Trying to get newer versions should fail
-      var e = intercept[SparkException] {
-        provider.getStore(2)
-      }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
-
-      e = intercept[SparkException] {
-        getData(provider, 2)
-      }
-      assert(e.getCause.isInstanceOf[SparkException])
-      assert(e.getCause.getMessage.contains("does not exist"))
+  Seq(true, false).foreach { useColumnFamilies =>

Review Comment:
   I take my word back. There are so many test cases falling this case - shall 
we check whether modifying testWithAllCodec would simply work?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ValueStateImpl.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import java.io.Serializable
+
+import org.apache.commons.lang3.SerializationUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types._
+
+/**
+ * Class that provides a concrete implementation for a single value state 
associated with state
+ * variables used in the streaming transformWithState operator.
+ * @param store - reference to the StateStore instance to be used for storing 
state
+ * @param stateName - name of logical state partition
+ * @tparam S - data type of object that will be stored
+ */
+class ValueStateImpl[S](
+    store: StateStore,
+    stateName: String) extends ValueState[S] with Logging{
+
+  private def encodeKey(): UnsafeRow = {
+    val keyOption = ImplicitKeyTracker.getImplicitKeyOption
+    if (!keyOption.isDefined) {
+      throw new UnsupportedOperationException("Implicit key not found for 
operation on" +
+        s"stateName=$stateName")
+    }
+
+    val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+    val keyByteArr = 
SerializationUtils.serialize(keyOption.get.asInstanceOf[Serializable])

Review Comment:
   The key is always the case, right? The type being returned from groupByKey() 
should be encodable with Spark SQL expression encoder, so even you have more 
types to handle, as long as you use the key as prefix of the entire key, they 
can be just done with single source of truth for serialization, Spark SQL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to