This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c02c8be43c6 [SPARK-43421][SS] Implement Changelog based Checkpointing 
for RocksDB State Store Provider
c02c8be43c6 is described below

commit c02c8be43c64dbd6bffceb53f8b18cd19a0d2f2e
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Thu Jun 1 14:34:24 2023 +0900

    [SPARK-43421][SS] Implement Changelog based Checkpointing for RocksDB State 
Store Provider
    
    ### What changes were proposed in this pull request?
    In order to reduce the checkpoint duration and end to end latency, we 
propose Changelog Based Checkpointing for RocksDB State Store Provider. Below 
is the mechanism.
    1. Changelog checkpoint: Upon each put() delete() call to local rocksdb 
instance, log the operation to a changelog file. During the state change 
commit,  sync the compressed change log of the current batch to DFS as 
checkpointDir/{version}.delta.
    2. Version reconstruction: For version j, find latest snapshot i.zip such 
that i <= j, load snapshot i, and replay i+1.delta ~ j.delta. This is used in 
loading the initial state as well as creating the latest version snapshot. 
Note: If a query is shutdown without exception, there won’t be changelog replay 
during query restart because a maintenance task is executed before the state 
store instance is unloaded.
    3. Background snapshot: A maintenance thread in executors will launch 
maintenance tasks periodically. Inside the maintenance task, sync the latest 
RocksDB local snapshot to DFS as checkpointDir/{version}.zip. Snapshot enables 
faster failure recovery and allows old versions to be purged.
    4. Garbage collection: Inside the maintenance task, delete snapshot and 
delta files from DFS for versions that is out of retention range(default 
retained version number is 100)
    
    ### Why are the changes needed?
    We have identified state checkpointing latency as one of the major 
performance bottlenecks for stateful streaming queries. Currently, RocksDB 
state store pauses the RocksDB instances to upload a snapshot to the cloud when 
committing a batch, which is heavy weight and has unpredictable performance.
    With changelog based checkpointing, we allow the RocksDB instance to run 
uninterruptibly, which improves RocksDB operation performance. This also 
dramatically reduces the commit time and batch duration because we are 
uploading a smaller amount of data during state commit. With this change, 
stateful query with RocksDB state store will have lower and more predictable 
latency.
    
    ### How was this patch tested?
    Add unit test for changelog checkpointing utility.
    Add unit test and integration test that check backward compatibility with 
existing checkpoint.
    Enable RocksDB state store unit test and stateful streaming query 
integration test to run with changelog checkpointing enabled.
    
    Closes #41099 from chaoqin-li1123/changelog.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  18 +
 .../sql/execution/streaming/state/RocksDB.scala    | 187 ++++++++--
 .../streaming/state/RocksDBFileManager.scala       | 119 ++++--
 .../state/RocksDBStateStoreProvider.scala          |   8 +-
 .../streaming/state/StateStoreChangelog.scala      | 167 +++++++++
 .../state/RocksDBStateStoreIntegrationSuite.scala  |  60 ++-
 .../streaming/state/RocksDBStateStoreSuite.scala   |  80 +++-
 .../execution/streaming/state/RocksDBSuite.scala   | 402 +++++++++++++++++----
 .../state/StateStoreCompatibilitySuite.scala       |   2 +-
 .../streaming/state/StateStoreSuite.scala          | 109 +++---
 .../streaming/FlatMapGroupsWithStateSuite.scala    |   3 +
 .../sql/streaming/RocksDBStateStoreTest.scala      |  52 +++
 .../sql/streaming/StreamingAggregationSuite.scala  |   3 +
 .../streaming/StreamingDeduplicationSuite.scala    |   3 +
 14 files changed, 1012 insertions(+), 201 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 267596a3899..53d5919d4dc 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2320,6 +2320,11 @@ Here are the configs regarding to RocksDB instance of 
the state store provider:
     <td>Whether we perform a range compaction of RocksDB instance for commit 
operation</td>
     <td>False</td>
   </tr>
+  <tr>
+    
<td>spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled</td>
+    <td>Whether to upload changelog instead of snapshot during RocksDB 
StateStore commit</td>
+    <td>False</td>
+  </tr>
   <tr>
     <td>spark.sql.streaming.stateStore.rocksdb.blockSizeKB</td>
     <td>Approximate size in KB of user data packed per block for a RocksDB 
BlockBasedTable, which is a RocksDB's default SST file format.</td>
@@ -2389,6 +2394,19 @@ If you want to cap RocksDB memory usage in your Spark 
Structured Streaming deplo
 You can also determine the max allowed memory for RocksDB instances by setting 
the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static 
number or as a fraction of the physical memory available on the node.
 Limits for individual RocksDB instances can also be configured by setting 
`spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and 
`spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required 
values. By default, RocksDB internal defaults are used for these settings.
 
+##### RocksDB State Store Changelog Checkpointing
+In newer version of Spark, changelog checkpointing is introduced for RocksDB 
state store. The traditional checkpointing mechanism for RocksDB State Store is 
incremental snapshot checkpointing, where the manifest files and newly 
generated RocksDB SST files of RocksDB instances are uploaded to a durable 
storage.
+Instead of uploading data files of RocksDB instances, changelog checkpointing 
uploads changes made to the state since the last checkpoint for durability.
+Snapshots are persisted periodically in the background for predictable failure 
recovery and changelog trimming.
+Changelog checkpointing avoids cost of capturing and uploading snapshots of 
RocksDB instances and significantly reduce streaming query latency.
+
+Changelog checkpointing is disabled by default. You can enable RocksDB State 
Store changelog checkpointing by setting 
`spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config 
to `true`.
+Changelog checkpointing is designed to be backward compatible with traditional 
checkpointing mechanism.
+RocksDB state store provider offers seamless support for transitioning between 
two checkpointing mechanisms in both directions. This allows you to leverage 
the performance benefits of changelog checkpointing without discarding the old 
state checkpoint.
+In a version of spark that supports changelog checkpointing, you can migrate 
streaming queries from older versions of Spark to changelog checkpointing by 
enabling changelog checkpointing in the spark session.
+Vice versa, you can disable changelog checkpointing safely in newer version of 
Spark, then any query that already run with changelog checkpointing will switch 
back to traditional checkpointing.
+You would need to restart you streaming queries for change in checkpointing 
mechanism to be applied, but you won't observe any performance degrade in the 
process.
+
 ##### Performance-aspect considerations
 
 1. You may want to disable the track of total number of rows to aim the better 
performance on RocksDB state store.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 84c6963ab0d..a9c15cf7f7d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -56,6 +56,15 @@ class RocksDB(
     hadoopConf: Configuration = new Configuration,
     loggingId: String = "") extends Logging {
 
+  case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys: 
Long) {
+    def close(): Unit = {
+      silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of 
snapshot $version")
+    }
+  }
+
+  @volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
+  @volatile private var lastSnapshotVersion = 0L
+
   RocksDBLoader.loadLibrary()
 
   // Java wrapper objects linking to native RocksDB objects
@@ -109,13 +118,15 @@ class RocksDB(
   private val nativeStats = dbOptions.statistics()
 
   private val workingDir = createTempDir("workingDir")
-  private val fileManager = new RocksDBFileManager(
-    dfsRootDir, createTempDir("fileManager"), hadoopConf, loggingId = 
loggingId)
+  private val fileManager = new RocksDBFileManager(dfsRootDir, 
createTempDir("fileManager"),
+    hadoopConf, conf.compressionCodec, loggingId = loggingId)
   private val byteArrayPair = new ByteArrayPair()
   private val commitLatencyMs = new mutable.HashMap[String, Long]()
   private val acquireLock = new Object
 
   @volatile private var db: NativeRocksDB = _
+  @volatile private var changelogWriter: Option[StateStoreChangelogWriter] = 
None
+  private val enableChangelogCheckpointing: Boolean = 
conf.enableChangelogCheckpointing
   @volatile private var loadedVersion = -1L   // -1 = nothing valid is loaded
   @volatile private var numKeysOnLoadedVersion = 0L
   @volatile private var numKeysOnWritingVersion = 0L
@@ -129,17 +140,20 @@ class RocksDB(
    * Note that this will copy all the necessary file from DFS to local disk as 
needed,
    * and possibly restart the native RocksDB instance.
    */
-  def load(version: Long): RocksDB = {
+  def load(version: Long, readOnly: Boolean = false): RocksDB = {
     assert(version >= 0)
     acquire()
     logInfo(s"Loading $version")
     try {
       if (loadedVersion != version) {
         closeDB()
-        val metadata = fileManager.loadCheckpointFromDfs(version, workingDir)
+        val latestSnapshotVersion = 
fileManager.getLatestSnapshotVersion(version)
+        val metadata = 
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+        loadedVersion = latestSnapshotVersion
+
         openDB()
 
-        val numKeys = if (!conf.trackTotalNumberOfRows) {
+        numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
           // we don't track the total number of rows - discard the number 
being track
           -1L
         } else if (metadata.numKeys < 0) {
@@ -149,10 +163,10 @@ class RocksDB(
         } else {
           metadata.numKeys
         }
-        numKeysOnWritingVersion = numKeys
-        numKeysOnLoadedVersion = numKeys
-
-        loadedVersion = version
+        if (loadedVersion != version) replayChangelog(version)
+        // After changelog replay the numKeysOnWritingVersion will be updated 
to
+        // the correct number of keys in the loaded version.
+        numKeysOnLoadedVersion = numKeysOnWritingVersion
         fileManagerMetrics = fileManager.latestLoadCheckpointMetrics
       }
       if (conf.resetStatsOnLoad) {
@@ -164,9 +178,36 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded data
         throw t
     }
+    if (enableChangelogCheckpointing && !readOnly) {
+      // Make sure we don't leak resource.
+      changelogWriter.foreach(_.abort())
+      changelogWriter = Some(fileManager.getChangeLogWriter(version + 1))
+    }
     this
   }
 
+  /**
+   * Replay change log from the loaded version to the target version.
+   */
+  private def replayChangelog(endVersion: Long): Unit = {
+    for (v <- loadedVersion + 1 to endVersion) {
+      var changelogReader: StateStoreChangelogReader = null
+      try {
+        changelogReader = fileManager.getChangelogReader(v)
+        changelogReader.foreach { case (key, value) =>
+          if (value != null) {
+            put(key, value)
+          } else {
+            remove(key)
+          }
+        }
+      } finally {
+        if (changelogReader != null) changelogReader.close()
+      }
+    }
+    loadedVersion = endVersion
+  }
+
   /**
    * Get the value for the given key if present, or null.
    * @note This will return the last written value even if it was uncommitted.
@@ -187,6 +228,7 @@ class RocksDB(
       }
     }
     db.put(writeOptions, key, value)
+    changelogWriter.foreach(_.put(key, value))
   }
 
   /**
@@ -201,6 +243,7 @@ class RocksDB(
       }
     }
     db.delete(writeOptions, key)
+    changelogWriter.foreach(_.delete(key))
   }
 
   /**
@@ -286,44 +329,66 @@ class RocksDB(
    */
   def commit(): Long = {
     val newVersion = loadedVersion + 1
-    val checkpointDir = createTempDir("checkpoint")
-    var rocksDBBackgroundThreadPaused = false
     try {
-      // Make sure the directory does not exist. Native RocksDB fails if the 
directory to
-      // checkpoint exists.
-      Utils.deleteRecursively(checkpointDir)
 
       logInfo(s"Flushing updates for $newVersion")
-      val flushTimeMs = timeTakenMs { db.flush(flushOptions) }
 
-      val compactTimeMs = if (conf.compactOnCommit) {
-        logInfo("Compacting")
-        timeTakenMs { db.compactRange() }
-      } else 0
-
-      logInfo("Pausing background work")
-      val pauseTimeMs = timeTakenMs {
-        db.pauseBackgroundWork() // To avoid files being changed while 
committing
-        rocksDBBackgroundThreadPaused = true
-      }
-
-      logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
-      val checkpointTimeMs = timeTakenMs {
-        val cp = Checkpoint.create(db)
-        cp.createCheckpoint(checkpointDir.toString)
+      var compactTimeMs = 0L
+      var flushTimeMs = 0L
+      var checkpointTimeMs = 0L
+      if (shouldCreateSnapshot()) {
+        // Need to flush the change to disk before creating a checkpoint
+        // because rocksdb wal is disabled.
+        logInfo(s"Flushing updates for $newVersion")
+        flushTimeMs = timeTakenMs { db.flush(flushOptions) }
+        if (conf.compactOnCommit) {
+          logInfo("Compacting")
+          compactTimeMs = timeTakenMs { db.compactRange() }
+        }
+        checkpointTimeMs = timeTakenMs {
+          val checkpointDir = createTempDir("checkpoint")
+          logInfo(s"Creating checkpoint for $newVersion in $checkpointDir")
+          // Make sure the directory does not exist. Native RocksDB fails if 
the directory to
+          // checkpoint exists.
+          Utils.deleteRecursively(checkpointDir)
+          // We no longer pause background operation before creating a RocksDB 
checkpoint because
+          // it is unnecessary. The captured snapshot will still be consistent 
with ongoing
+          // background operations.
+          val cp = Checkpoint.create(db)
+          cp.createCheckpoint(checkpointDir.toString)
+          synchronized {
+            // if changelog checkpointing is disabled, the snapshot is 
uploaded synchronously
+            // inside the uploadSnapshot() called below.
+            // If changelog checkpointing is enabled, snapshot will be 
uploaded asynchronously
+            // during state store maintenance.
+            latestSnapshot.foreach(_.close())
+            latestSnapshot = Some(
+              RocksDBSnapshot(checkpointDir, newVersion, 
numKeysOnWritingVersion))
+            lastSnapshotVersion = newVersion
+          }
+        }
       }
 
       logInfo(s"Syncing checkpoint for $newVersion to DFS")
       val fileSyncTimeMs = timeTakenMs {
-        fileManager.saveCheckpointToDfs(checkpointDir, newVersion, 
numKeysOnWritingVersion)
+        if (enableChangelogCheckpointing) {
+          try {
+            assert(changelogWriter.isDefined)
+            changelogWriter.foreach(_.commit())
+          } finally {
+            changelogWriter = None
+          }
+        } else {
+          assert(changelogWriter.isEmpty)
+          uploadSnapshot()
+        }
       }
+
       numKeysOnLoadedVersion = numKeysOnWritingVersion
       loadedVersion = newVersion
-      fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
       commitLatencyMs ++= Map(
         "flush" -> flushTimeMs,
         "compact" -> compactTimeMs,
-        "pause" -> pauseTimeMs,
         "checkpoint" -> checkpointTimeMs,
         "fileSync" -> fileSyncTimeMs
       )
@@ -334,25 +399,60 @@ class RocksDB(
         loadedVersion = -1  // invalidate loaded version
         throw t
     } finally {
-      if (rocksDBBackgroundThreadPaused) db.continueBackgroundWork()
-      silentDeleteRecursively(checkpointDir, s"committing $newVersion")
       // reset resources as either 1) we already pushed the changes and it has 
been committed or
       // 2) commit has failed and the current version is "invalidated".
       release()
     }
   }
 
+  private def shouldCreateSnapshot(): Boolean = {
+    if (enableChangelogCheckpointing) {
+      assert(changelogWriter.isDefined)
+      val newVersion = loadedVersion + 1
+      newVersion - lastSnapshotVersion >= conf.minDeltasForSnapshot ||
+        changelogWriter.get.size > 10000
+    } else true
+  }
+
+  private def uploadSnapshot(): Unit = {
+    val localCheckpoint = synchronized {
+      val checkpoint = latestSnapshot
+      latestSnapshot = None
+      checkpoint
+    }
+    localCheckpoint match {
+      case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
+        try {
+          val uploadTime = timeTakenMs {
+            fileManager.saveCheckpointToDfs(localDir, version, numKeys)
+            fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+          }
+          logInfo(s"$loggingId: Upload snapshot of version $version," +
+            s" time taken: $uploadTime ms")
+        } finally {
+          localCheckpoint.foreach(_.close())
+        }
+      case _ =>
+    }
+  }
+
   /**
    * Drop uncommitted changes, and roll back to previous version.
    */
   def rollback(): Unit = {
     numKeysOnWritingVersion = numKeysOnLoadedVersion
     loadedVersion = -1L
+    changelogWriter.foreach(_.abort())
+    // Make sure changelogWriter gets recreated next time.
+    changelogWriter = None
     release()
     logInfo(s"Rolled back to $loadedVersion")
   }
 
-  def cleanup(): Unit = {
+  def doMaintenance(): Unit = {
+    if (enableChangelogCheckpointing) {
+      uploadSnapshot()
+    }
     val cleanupTime = timeTakenMs {
       fileManager.deleteOldVersions(conf.minVersionsToRetain)
     }
@@ -369,6 +469,9 @@ class RocksDB(
       flushOptions.close()
       dbOptions.close()
       dbLogger.close()
+      synchronized {
+        latestSnapshot.foreach(_.close())
+      }
       silentDeleteRecursively(localRootDir, "closing RocksDB")
     } catch {
       case e: Exception =>
@@ -550,7 +653,9 @@ class ByteArrayPair(var key: Array[Byte] = null, var value: 
Array[Byte] = null)
  */
 case class RocksDBConf(
     minVersionsToRetain: Int,
+    minDeltasForSnapshot: Int,
     compactOnCommit: Boolean,
+    enableChangelogCheckpointing: Boolean,
     blockSizeKB: Long,
     blockCacheSizeMB: Long,
     lockAcquireTimeoutMs: Long,
@@ -563,7 +668,8 @@ case class RocksDBConf(
     boundedMemoryUsage: Boolean,
     totalMemoryUsageMB: Long,
     writeBufferCacheRatio: Double,
-    highPriorityPoolRatio: Double)
+    highPriorityPoolRatio: Double,
+    compressionCodec: String)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -585,6 +691,8 @@ object RocksDBConf {
 
   // Configuration that specifies whether to compact the RocksDB data every 
time data is committed
   private val COMPACT_ON_COMMIT_CONF = SQLConfEntry("compactOnCommit", "false")
+  private val ENABLE_CHANGELOG_CHECKPOINTING_CONF = SQLConfEntry(
+    "changelogCheckpointing.enabled", "false")
   private val BLOCK_SIZE_KB_CONF = SQLConfEntry("blockSizeKB", "4")
   private val BLOCK_CACHE_SIZE_MB_CONF = SQLConfEntry("blockCacheSizeMB", "8")
   // See SPARK-42794 for details.
@@ -705,7 +813,9 @@ object RocksDBConf {
 
     RocksDBConf(
       storeConf.minVersionsToRetain,
+      storeConf.minDeltasForSnapshot,
       getBooleanConf(COMPACT_ON_COMMIT_CONF),
+      getBooleanConf(ENABLE_CHANGELOG_CHECKPOINTING_CONF),
       getPositiveLongConf(BLOCK_SIZE_KB_CONF),
       getPositiveLongConf(BLOCK_CACHE_SIZE_MB_CONF),
       getPositiveLongConf(LOCK_ACQUIRE_TIMEOUT_MS_CONF),
@@ -718,7 +828,8 @@ object RocksDBConf {
       getBooleanConf(BOUNDED_MEMORY_USAGE_CONF),
       getLongConf(MAX_MEMORY_USAGE_MB_CONF),
       getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
-      getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF))
+      getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
+      storeConf.compressionCodec)
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index d8f6c1b2abb..0891d773713 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -37,7 +37,9 @@ import org.apache.hadoop.fs.{FileStatus, Path, PathFilter}
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.execution.streaming.CheckpointFileManager
 import org.apache.spark.util.Utils
 
@@ -123,6 +125,7 @@ class RocksDBFileManager(
     dfsRootDir: String,
     localTempDir: File,
     hadoopConf: Configuration,
+    codecName: String = "zstd",
     loggingId: String = "")
   extends Logging {
 
@@ -134,6 +137,27 @@ class RocksDBFileManager(
   private val onlyZipFiles = new PathFilter {
     override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
   }
+  private val onlyChangelogFiles = new PathFilter {
+    override def accept(path: Path): Boolean = 
path.toString.endsWith(".changelog")
+  }
+
+  private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new 
SparkConf)
+
+  private def codec = CompressionCodec.createCodec(sparkConf, codecName)
+
+  def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
+    val rootDir = new Path(dfsRootDir)
+    val changelogFile = dfsChangelogFile(version)
+    if (!fm.exists(rootDir)) fm.mkdirs(rootDir)
+    val changelogWriter = new StateStoreChangelogWriter(fm, changelogFile, 
codec)
+    changelogWriter
+  }
+
+  // Get the changelog file at version
+  def getChangelogReader(version: Long): StateStoreChangelogReader = {
+    val changelogFile = dfsChangelogFile(version)
+    new StateStoreChangelogReader(fm, changelogFile, codec)
+  }
 
   /**
    * Metrics for loading checkpoint from DFS. Every loadCheckpointFromDFS call 
will update this
@@ -205,19 +229,45 @@ class RocksDBFileManager(
     metadata
   }
 
-  /** Get the latest version available in the DFS directory. If no data 
present, it returns 0. */
-  def getLatestVersion(): Long = {
+  // Get latest snapshot version <= version
+  def getLatestSnapshotVersion(version: Long): Long = {
     val path = new Path(dfsRootDir)
     if (fm.exists(path)) {
+      // If the latest version snapshot exists, we avoid listing.
+      if (fm.exists(dfsBatchZipFile(version))) {
+        return version
+      }
       fm.list(path, onlyZipFiles)
         .map(_.getPath.getName.stripSuffix(".zip"))
         .map(_.toLong)
+        .filter(_ <= version)
         .foldLeft(0L)(math.max)
     } else {
       0
     }
   }
 
+
+  /** Get the latest version available in the DFS directory. If no data 
present, it returns 0. */
+  def getLatestVersion(): Long = {
+    val path = new Path(dfsRootDir)
+    if (fm.exists(path)) {
+      val files = fm.list(path).map(_.getPath)
+      val changelogFileVersions = files
+        .filter(onlyChangelogFiles.accept(_))
+        .map(_.getName.stripSuffix(".changelog"))
+        .map(_.toLong)
+      val snapshotFileVersions = files
+        .filter(onlyZipFiles.accept(_))
+        .map(_.getName.stripSuffix(".zip"))
+        .map(_.toLong)
+      val versions = changelogFileVersions ++ snapshotFileVersions
+      versions.foldLeft(0L)(math.max)
+    } else {
+      0
+    }
+  }
+
   /**
    * Find orphan files which are not tracked by zip files.
    * Both sst files and log files can be orphan files.
@@ -250,6 +300,18 @@ class RocksDBFileManager(
     }
   }
 
+  private def deleteChangelogFiles(versionsToDelete: Array[Long]): Unit = {
+    versionsToDelete.foreach { version =>
+      try {
+        fm.delete(dfsChangelogFile(version))
+        logInfo(s"Deleted changelog file $version")
+      } catch {
+        case e: Exception =>
+          logWarning(s"Error deleting changelog file for version $version", e)
+      }
+    }
+  }
+
   /**
    * Delete old versions by deleting the associated version and SST files.
    * At a high-level, this method finds which versions to delete, and which 
SST files that were
@@ -268,7 +330,9 @@ class RocksDBFileManager(
    * - Find the orphan sst and log files whose zip files are not uploaded 
successfully
    *   or have been overwritten. To avoid deleting files of ongoing tasks, 
only delete orphan files
    *   that are older than all tracked files when there are at least 2 
versions.
-   * - Delete files in both to-be-deleted versions and orphan files.
+   * - Delete sst and log files in to-be-deleted versions.
+   * - Delete orphan files.
+   * - Delete changelog files of to-be-deleted versions.
    *
    * Note that it only deletes files that it knows are safe to delete.
    * It may not delete the following files.
@@ -278,36 +342,39 @@ class RocksDBFileManager(
    */
   def deleteOldVersions(numVersionsToRetain: Int): Unit = {
     val path = new Path(dfsRootDir)
-
+    val allFiles = fm.list(path).map(_.getPath)
+    val snapshotFiles = allFiles.filter(file => onlyZipFiles.accept(file))
+    val changelogFiles = allFiles.filter(file => 
onlyChangelogFiles.accept(file))
     // All versions present in DFS, sorted
-    val sortedVersions = fm.list(path, onlyZipFiles)
-      .map(_.getPath.getName.stripSuffix(".zip"))
+    val sortedSnapshotVersions = snapshotFiles
+      .map(_.getName.stripSuffix(".zip"))
       .map(_.toLong)
       .sorted
 
     // Return if no versions generated yet
-    if (sortedVersions.isEmpty) return
+    if (sortedSnapshotVersions.isEmpty) return
 
     // Find the versions to delete
-    val maxVersionPresent = sortedVersions.last
-    val minVersionPresent = sortedVersions.head
-    val minVersionToRetain =
-      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
-    val versionsToDelete = sortedVersions.takeWhile(_ < 
minVersionToRetain).toSet[Long]
+    val maxSnapshotVersionPresent = sortedSnapshotVersions.last
 
-    // When versionToDelete is non-empty, there are at least 2 versions.
+    // In order to reconstruct numVersionsToRetain version, retain the latest 
snapshot
+    // that satisfies (version <= maxSnapshotVersionPresent - 
numVersionsToRetain + 1).
+    // If none of the snapshots satisfy the condition, minVersionToRetain will 
be 0 and
+    // no version gets deleted.
+    val minVersionToRetain = sortedSnapshotVersions
+      .filter(_ <= maxSnapshotVersionPresent - numVersionsToRetain + 1)
+      .foldLeft(0L)(math.max)
+
+    // When snapshotVersionToDelete is non-empty, there are at least 2 
snapshot versions.
     // We only delete orphan files when there are at least 2 versions,
     // which avoid deleting files for running tasks.
-    if (versionsToDelete.isEmpty) return
+    val snapshotVersionsToDelete = sortedSnapshotVersions.filter(_ < 
minVersionToRetain)
+    if (snapshotVersionsToDelete.isEmpty) return
 
-    logInfo(
-      s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
-        s"cleaning up all versions older than $minVersionToRetain to retain 
last " +
-        s"$numVersionsToRetain versions")
 
     // Resolve RocksDB files for all the versions and find the max version 
each file is used
     val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
-    sortedVersions.foreach { version =>
+    sortedSnapshotVersions.foreach { version =>
       val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
         val newResolvedFiles = getImmutableFilesFromVersionZip(version)
         versionToRocksDBFiles.put(version, newResolvedFiles)
@@ -318,7 +385,9 @@ class RocksDBFileManager(
     }
 
     // Best effort attempt to delete SST files that were last used in 
to-be-deleted versions
-    val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => 
versionsToDelete.contains(v) }
+    val filesToDelete = fileToMaxUsedVersion.filter {
+      case (_, v) => snapshotVersionsToDelete.contains(v)
+    }
 
     val sstDir = new Path(dfsRootDir, 
RocksDBImmutableFile.SST_FILES_DFS_SUBDIR)
     val logDir = new Path(dfsRootDir, 
RocksDBImmutableFile.LOG_FILES_DFS_SUBDIR)
@@ -349,7 +418,7 @@ class RocksDBFileManager(
     }
 
     // Delete the version files and forget about them
-    versionsToDelete.foreach { version =>
+    snapshotVersionsToDelete.foreach { version =>
       val versionFile = dfsBatchZipFile(version)
       try {
         fm.delete(versionFile)
@@ -362,6 +431,11 @@ class RocksDBFileManager(
     }
     logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to 
delete" +
       s"$failedToDelete files) not used in versions >= $minVersionToRetain")
+
+    val changelogVersionsToDelete = changelogFiles
+      .map(_.getName.stripSuffix(".changelog")).map(_.toLong)
+      .filter(_ < minVersionToRetain)
+    deleteChangelogFiles(changelogVersionsToDelete)
   }
 
   /** Save immutable files to DFS directory */
@@ -534,6 +608,9 @@ class RocksDBFileManager(
   }
 
   private def dfsBatchZipFile(version: Long): Path = new 
Path(s"$dfsRootDir/$version.zip")
+  // We use changelog suffix intentionally so that we can tell the difference 
from changelog file of
+  // HDFSBackedStateStore which is named version.delta.
+  private def dfsChangelogFile(version: Long): Path = new 
Path(s"$dfsRootDir/$version.changelog")
 
   private def localMetadataFile(parentDir: File): File = new File(parentDir, 
"metadata")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index b4b648c3693..10f207c7ec1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -195,8 +195,14 @@ private[sql] class RocksDBStateStoreProvider
     new RocksDBStateStore(version)
   }
 
+  override def getReadStore(version: Long): StateStore = {
+    require(version >= 0, "Version cannot be less than 0")
+    rocksDB.load(version, true)
+    new RocksDBStateStore(version)
+  }
+
   override def doMaintenance(): Unit = {
-    rocksDB.cleanup()
+    rocksDB.doMaintenance()
   }
 
   override def close(): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
new file mode 100644
index 00000000000..372cbb6d986
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+import org.apache.spark.util.NextIterator
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(
+    fm: CheckpointFileManager,
+    file: Path,
+    compressionCodec: CompressionCodec) extends Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(
+    fm: CheckpointFileManager,
+    fileToRead: Path,
+    compressionCodec: CompressionCodec)
+  extends NextIterator[(Array[Byte], Array[Byte])] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream 
= {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $fileToRead does not exist. " +
+          "If the stream job is restarted with a new or updated state 
operation, please" +
+          " create a new checkpoint location or clear the existing checkpoint 
location.", f)
+  }
+  private val input: DataInputStream = decompressStream(sourceStream)
+
+  def close(): Unit = { if (input != null) input.close() }
+
+  override def getNext(): (Array[Byte], Array[Byte]) = {
+    val keySize = input.readInt()
+    // A -1 key size mean end of file.
+    if (keySize == -1) {
+      finished = true
+      null
+    } else if (keySize < 0) {
+      throw new IOException(
+        s"Error reading streaming state file $fileToRead: key size cannot be 
$keySize")
+    } else {
+      // TODO: reuse the key buffer and value buffer across records.
+      val keyBuffer = new Array[Byte](keySize)
+      ByteStreams.readFully(input, keyBuffer, 0, keySize)
+      val valueSize = input.readInt()
+      if (valueSize < 0) {
+        // A deletion record
+        (keyBuffer, null)
+      } else {
+        val valueBuffer = new Array[Byte](valueSize)
+        ByteStreams.readFully(input, valueBuffer, 0, valueSize)
+        // A put record.
+        (keyBuffer, valueBuffer)
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 345526bb986..339d00058fc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -27,8 +27,11 @@ import 
org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWra
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.streaming.OutputMode.Update
+import org.apache.spark.util.Utils
 
-class RocksDBStateStoreIntegrationSuite extends StreamTest {
+class RocksDBStateStoreIntegrationSuite extends StreamTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
   import testImplicits._
 
   test("RocksDBStateStore") {
@@ -45,7 +48,11 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest {
           // Verify that RocksDBStateStore by verify the state checkpoints are 
[version].zip
           val storeCheckpointDir = StateStoreId(
             dir.getAbsolutePath + "/state", 0, 0).storeCheckpointLocation()
-          val storeCheckpointFile = storeCheckpointDir + "/1.zip"
+          val storeCheckpointFile = if (isChangelogCheckpointingEnabled) {
+            storeCheckpointDir + "/1.changelog"
+          } else {
+            storeCheckpointDir + "/1.zip"
+          }
           new File(storeCheckpointFile).exists()
         }
       )
@@ -205,4 +212,53 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest 
{
       }
     }
   }
+
+  testWithChangelogCheckpointingEnabled(
+    "Streaming aggregation RocksDB State Store backward compatibility.") {
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    checkpointDir.delete()
+
+    val dirForPartition0 = new File(checkpointDir.getAbsolutePath, 
"/state/0/0")
+    val inputData = MemoryStream[Int]
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    // Run the stream with changelog checkpointing disabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> 
"false")),
+      AddData(inputData, 3),
+      CheckLastBatch((3, 1)),
+      AddData(inputData, 3, 2),
+      CheckLastBatch((3, 2), (2, 1)),
+      StopStream
+    )
+    assert(changelogVersionsPresent(dirForPartition0).isEmpty)
+    assert(snapshotVersionsPresent(dirForPartition0) == List(1L, 2L))
+
+    // Run the stream with changelog checkpointing enabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "true")),
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch((3, 3), (2, 2), (1, 1)),
+      // By default we run in new tuple mode.
+      AddData(inputData, 4, 4, 4, 4),
+      CheckLastBatch((4, 4))
+    )
+    assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
+
+    // Run the stream with changelog checkpointing disabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> 
"false")),
+      AddData(inputData, 4),
+      CheckLastBatch((4, 5))
+    )
+    assert(changelogVersionsPresent(dirForPartition0) == List(3L, 4L))
+    assert(snapshotVersionsPresent(dirForPartition0).contains(5L))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 82167db6d7d..d113085fd1c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -25,10 +25,13 @@ import org.apache.hadoop.conf.Configuration
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
+import org.apache.spark.io.CompressionCodec
 import org.apache.spark.sql.LocalSparkSession.withSparkSession
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.ExtendedSQLTest
 import org.apache.spark.unsafe.Platform
@@ -36,6 +39,8 @@ import org.apache.spark.util.Utils
 
 @ExtendedSQLTest
 class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvider]
+  with AlsoTestWithChangelogCheckpointingEnabled
+  with SharedSparkSession
   with BeforeAndAfter {
 
   before {
@@ -76,6 +81,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         ("spark.sql.streaming.stateStore.providerClass",
           classOf[RocksDBStateStoreProvider].getName),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compactOnCommit", 
"true"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + 
".changelogCheckpointing.enabled", "true"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".lockAcquireTimeoutMs", 
"10"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxOpenFiles", "1000"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", 
"3"),
@@ -103,6 +109,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
       // Verify the confs are same as those configured in the session conf
       assert(rocksDBConfInTask.compactOnCommit == true)
+      assert(rocksDBConfInTask.enableChangelogCheckpointing == true)
       assert(rocksDBConfInTask.lockAcquireTimeoutMs == 10L)
       assert(rocksDBConfInTask.formatVersion == 4)
       assert(rocksDBConfInTask.maxOpenFiles == 1000)
@@ -118,20 +125,22 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(metricPair.isDefined)
       metricPair.get._2
     }
-
-    tryWithProviderResource(newStoreProvider()) { provider =>
-      val store = provider.getStore(0)
-      // Verify state after updating
-      put(store, "a", 0, 1)
-      assert(get(store, "a", 0) === Some(1))
-      assert(store.commit() === 1)
-      assert(store.hasCommitted)
-      val storeMetrics = store.metrics
-      assert(storeMetrics.numKeys === 1)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 0L)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 0L)
-      assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 0L)
-      assert(getCustomMetric(storeMetrics, 
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+    withSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key -> "1") {
+      tryWithProviderResource(newStoreProvider()) { provider =>
+          val store = provider.getStore(0)
+          // Verify state after updating
+          put(store, "a", 0, 1)
+          assert(get(store, "a", 0) === Some(1))
+          assert(store.commit() === 1)
+          provider.doMaintenance()
+          assert(store.hasCommitted)
+          val storeMetrics = store.metrics
+          assert(storeMetrics.numKeys === 1)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_COPIED) > 
0L)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_FILES_REUSED) == 
0L)
+          assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 
0L)
+          assert(getCustomMetric(storeMetrics, 
CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
+      }
     }
   }
 
@@ -149,11 +158,12 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
   def newStoreProvider(
       storeId: StateStoreId,
-      numColsPrefixKey: Int): RocksDBStateStoreProvider = {
+      numColsPrefixKey: Int,
+      sqlConf: Option[SQLConf] = None): RocksDBStateStoreProvider = {
     val provider = new RocksDBStateStoreProvider()
     provider.init(
       storeId, keySchema, valueSchema, numColsPrefixKey = numColsPrefixKey,
-      new StateStoreConf, new Configuration)
+      new StateStoreConf(sqlConf.getOrElse(SQLConf.get)), new Configuration)
     provider
   }
 
@@ -173,10 +183,44 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
 
   override def newStoreProvider(
     minDeltasForSnapshot: Int,
-    numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = 
newStoreProvider()
+    numOfVersToRetainInMemory: Int): RocksDBStateStoreProvider = {
+    newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0), 0,
+      Some(getDefaultSQLConf(minDeltasForSnapshot, numOfVersToRetainInMemory)))
+  }
 
   override def getDefaultSQLConf(
     minDeltasForSnapshot: Int,
-    numOfVersToRetainInMemory: Int): SQLConf = new SQLConf()
+    numOfVersToRetainInMemory: Int): SQLConf = {
+    val sqlConf = SQLConf.get.clone()
+    sqlConf.setConfString(
+      SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.key, 
minDeltasForSnapshot.toString)
+    sqlConf
+  }
+
+  override def testQuietly(name: String)(f: => Unit): Unit = {
+    test(name) {
+      quietly {
+        f
+      }
+    }
+  }
+
+  override def testWithAllCodec(name: String)(func: => Any): Unit = {
+    codecsInShortName.foreach { codecName =>
+      super.test(s"$name - with codec $codecName") {
+        withSQLConf(SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> codecName) {
+          func
+        }
+      }
+    }
+
+    CompressionCodec.ALL_COMPRESSION_CODECS.foreach { codecName =>
+      super.test(s"$name - with codec $codecName") {
+        withSQLConf(SQLConf.STATE_STORE_COMPRESSION_CODEC.key -> codecName) {
+          func
+        }
+      }
+    }
+  }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index f66ac0de8c6..3023a445930 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -24,14 +24,297 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
 
-import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-class RocksDBSuite extends SparkFunSuite {
+trait RocksDBStateStoreChangelogCheckpointingTestUtil {
+  val rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+    ".changelogCheckpointing.enabled"
+
+  def isChangelogCheckpointingEnabled: Boolean =
+    SQLConf.get.getConfString(rocksdbChangelogCheckpointingConfKey) == "true"
+
+  def snapshotVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".zip"))
+      .map(_.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+  }
+
+  def changelogVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".changelog"))
+      .map(_.getName.stripSuffix(".changelog"))
+      .map(_.toLong)
+      .sorted
+  }
+}
+
+
+trait AlsoTestWithChangelogCheckpointingEnabled
+  extends SQLTestUtils with RocksDBStateStoreChangelogCheckpointingTestUtil {
+
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    testWithChangelogCheckpointingEnabled(testName, testTags: _*)(testBody)
+    testWithChangelogCheckpointingDisabled(testName, testTags: _*)(testBody)
+  }
+
+  def testWithChangelogCheckpointingEnabled(testName: String, testTags: Tag*)
+                                        (testBody: => Any): Unit = {
+    super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+  }
+
+  def testWithChangelogCheckpointingDisabled(testName: String, testTags: Tag*)
+                                           (testBody: => Any): Unit = {
+    super.test(testName + " (without changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+  }
+}
+
+class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with 
SharedSparkSession {
+
+  sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, 
classOf[RocksDBStateStoreProvider].getName)
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: check changelog and snapshot version") {
+    val remoteDir = Utils.createTempDir().toString
+    val conf = dbConf.copy(minDeltasForSnapshot = 1)
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    for (version <- 0 to 49) {
+      withDB(remoteDir, version = version, conf = conf) { db =>
+          db.put(version.toString, version.toString)
+          db.commit()
+          if ((version + 1) % 5 == 0) db.doMaintenance()
+      }
+    }
+
+    if (isChangelogCheckpointingEnabled) {
+      assert(changelogVersionsPresent(remoteDir) === (1 to 50))
+      assert(snapshotVersionsPresent(remoteDir) === Range.inclusive(5, 50, 5))
+    } else {
+      assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 50))
+    }
+  }
+
+  test("RocksDB: load version that doesn't exist") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    withDB(remoteDir) { db =>
+      intercept[IllegalStateException] {
+        db.load(1)
+      }
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: purge changelog and snapshots") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    val conf = dbConf.copy(enableChangelogCheckpointing = true,
+      minVersionsToRetain = 3, minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = conf) { db =>
+      db.load(0)
+      db.commit()
+      for (version <- 1 to 2) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+      assert(changelogVersionsPresent(remoteDir) == Seq(1, 2, 3))
+
+      for (version <- 3 to 4) {
+        db.load(version)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(2, 3))
+      assert(changelogVersionsPresent(remoteDir) == (1 to 5))
+      db.doMaintenance()
+      // 3 is the latest snapshot <= maxSnapshotVersionPresent - 
minVersionsToRetain + 1
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+      assert(changelogVersionsPresent(remoteDir) == (3 to 5))
+
+      for (version <- 5 to 7) {
+        db.load(version)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 5))
+      assert(changelogVersionsPresent(remoteDir) == (3 to 8))
+      db.doMaintenance()
+      // 5 is the latest snapshot <= maxSnapshotVersionPresent - 
minVersionsToRetain + 1
+      assert(snapshotVersionsPresent(remoteDir) === Seq(5, 8))
+      assert(changelogVersionsPresent(remoteDir) == (5 to 8))
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: minDeltasForSnapshot") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    val conf = dbConf.copy(enableChangelogCheckpointing = true, 
minDeltasForSnapshot = 3)
+    withDB(remoteDir, conf = conf) { db =>
+      for (version <- 0 to 1) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      // Snapshot should not be created because minDeltasForSnapshot = 3
+      assert(snapshotVersionsPresent(remoteDir) === Seq.empty)
+      assert(changelogVersionsPresent(remoteDir) == Seq(1, 2))
+      db.load(2)
+      db.commit()
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3))
+      db.load(3)
+      for (i <- 1 to 10001) {
+        db.put(i.toString, i.toString)
+      }
+      db.commit()
+      db.doMaintenance()
+      // Snapshot should be created this time because the size of the change 
log > 1000
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4))
+      for (version <- 4 to 7) {
+        db.load(version)
+        db.commit()
+        db.doMaintenance()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7))
+      for (version <- 8 to 20) {
+        db.load(version)
+        db.commit()
+      }
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(3, 4, 7, 19))
+    }
+  }
+
+  // A rocksdb instance with changelog checkpointing enabled should be able to 
load
+  // an existing checkpoint without changelog.
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: changelog checkpointing backward compatibility") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    val disableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = false, minVersionsToRetain = 
30)
+    withDB(remoteDir, conf = disableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+    }
+
+    // Now enable changelog checkpointing in a checkpoint created by a state 
store
+    // that disable changelog checkpointing.
+    val enableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 
30,
+        minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+      for (version <- 30 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(snapshotVersionsPresent(remoteDir) === (1 to 30))
+      assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+      for (version <- 1 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+      // Check that snapshots and changelogs get purged correctly.
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === Seq(30, 60))
+      assert(changelogVersionsPresent(remoteDir) === (30 to 60))
+      // Verify the content of retained versions.
+      for (version <- 30 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+    }
+  }
+
+  // A rocksdb instance with changelog checkpointing disabled should be able 
to load
+  // an existing checkpoint with changelog.
+  testWithChangelogCheckpointingEnabled(
+    "RocksDB: changelog checkpointing forward compatibility") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+    val enableChangelogCheckpointingConf =
+      dbConf.copy(enableChangelogCheckpointing = true, minVersionsToRetain = 
20,
+        minDeltasForSnapshot = 3)
+    withDB(remoteDir, conf = enableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+    }
+
+    // Now disable changelog checkpointing in a checkpoint created by a state 
store
+    // that enable changelog checkpointing.
+    val disableChangelogCheckpointingConf =
+    dbConf.copy(enableChangelogCheckpointing = false, minVersionsToRetain = 20,
+      minDeltasForSnapshot = 1)
+    withDB(remoteDir, conf = disableChangelogCheckpointingConf) { db =>
+      for (version <- 1 to 30) {
+        db.load(version)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+      for (version <- 31 to 60) {
+        db.load(version - 1)
+        db.put(version.toString, version.toString)
+        db.remove((version - 1).toString)
+        db.commit()
+      }
+      assert(changelogVersionsPresent(remoteDir) === (1 to 30))
+      assert(snapshotVersionsPresent(remoteDir) === (31 to 60))
+      for (version <- 1 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+      // Check that snapshots and changelogs get purged correctly.
+      db.doMaintenance()
+      assert(snapshotVersionsPresent(remoteDir) === (41 to 60))
+      assert(changelogVersionsPresent(remoteDir) === Seq.empty)
+      // Verify the content of retained versions.
+      for (version <- 41 to 60) {
+        db.load(version, readOnly = true)
+        assert(db.iterator().map(toStr).toSet === Set((version.toString, 
version.toString)))
+      }
+    }
+  }
 
   test("RocksDB: get, put, iterator, commit, load") {
     def testOps(compactOnCommit: Boolean): Unit = {
@@ -102,56 +385,17 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
-  test("RocksDB: cleanup old files") {
-    val remoteDir = Utils.createTempDir().toString
-    val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain 
= 10)
-
-    def versionsPresent: Seq[Long] = {
-      remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
-        .map(_.getName.stripSuffix(".zip"))
-        .map(_.toLong)
-        .sorted
-    }
-
-    withDB(remoteDir, conf = conf) { db =>
-      // Generate versions without cleaning up
-      for (version <- 1 to 50) {
-        if (version > 1) {
-          // remove keys we wrote in previous iteration to ensure compaction 
happens
-          db.remove((version - 1).toString)
-        }
-        db.put(version.toString, version.toString)
-        db.commit()
-      }
-
-      // Clean up and verify version files and SST files were deleted
-      require(versionsPresent === (1L to 50L))
-      val sstDir = new File(remoteDir, "SSTs")
-      val numSstFiles = listFiles(sstDir).length
-      db.cleanup()
-      assert(versionsPresent === (41L to 50L))
-      assert(listFiles(sstDir).length < numSstFiles)
-
-      // Verify data in retained vesions.
-      versionsPresent.foreach { version =>
-        db.load(version)
-        val data = db.iterator().map(toStr).toSet
-        assert(data === Set((version.toString, version.toString)))
-      }
-    }
-  }
-
   test("RocksDB: handle commit failures and aborts") {
     val hadoopConf = new Configuration()
     hadoopConf.set(
       SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
       classOf[CreateAtomicTestManager].getName)
     val remoteDir = Utils.createTempDir().getAbsolutePath
-    val conf = RocksDBConf().copy(compactOnCommit = true)
-    withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+    withDB(remoteDir, hadoopConf = hadoopConf) { db =>
       // Disable failure of output stream and generate versions
       CreateAtomicTestManager.shouldFailInCreateAtomic = false
       for (version <- 1 to 10) {
+        db.load(version - 1)
         db.put(version.toString, version.toString) // update "1" -> "1", "2" 
-> "2", ...
         db.commit()
       }
@@ -159,16 +403,35 @@ class RocksDBSuite extends SparkFunSuite {
 
       // Fail commit for next version and verify that reloading resets the 
files
       CreateAtomicTestManager.shouldFailInCreateAtomic = true
+      db.load(10)
       db.put("11", "11")
       intercept[IOException] { quietly { db.commit() } }
-      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+      assert(db.load(10, readOnly = true).iterator().map(toStr).toSet === 
version10Data)
       CreateAtomicTestManager.shouldFailInCreateAtomic = false
 
       // Abort commit for next version and verify that reloading resets the 
files
       db.load(10)
       db.put("11", "11")
       db.rollback()
-      assert(db.load(10).iterator().map(toStr).toSet === version10Data)
+      assert(db.load(10, readOnly = true).iterator().map(toStr).toSet === 
version10Data)
+    }
+  }
+
+  testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write 
changelog") {
+    val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + 
"/state/1/1")
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+    val changelogWriter = fileManager.getChangeLogWriter(1)
+    for (i <- 1 to 5) changelogWriter.put(i.toString, i.toString)
+    for (j <- 2 to 4) changelogWriter.delete(j.toString)
+    changelogWriter.commit()
+    val changelogReader = fileManager.getChangelogReader(1)
+    val entries = changelogReader.toSeq
+    val expectedEntries = (1 to 5).map(i => (i.toString.getBytes, 
i.toString.getBytes)) ++
+      (2 to 4).map(j => (j.toString.getBytes, null))
+    assert(entries.size == expectedEntries.size)
+    entries.zip(expectedEntries).map{
+      case (e1, e2) => assert(e1._1 === e2._1 && e1._2 === e2._2)
     }
   }
 
@@ -433,7 +696,7 @@ class RocksDBSuite extends SparkFunSuite {
     quietly {
       withDB(
         Utils.createTempDir().toString,
-        conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
+        conf = dbConf.copy(lockAcquireTimeoutMs = 20)) { db =>
         // DB has been loaded so current thread has alread acquired the lock 
on the RocksDB instance
 
         db.load(0)  // Current thread should be able to load again
@@ -470,26 +733,19 @@ class RocksDBSuite extends SparkFunSuite {
   }
 
   test("ensure concurrent access lock is released after Spark task completes") 
{
-    val conf = new SparkConf().setAppName("test").setMaster("local")
-    val sc = new SparkContext(conf)
-
-    try {
-      RocksDBSuite.withSingletonDB {
-        // Load a RocksDB instance, that is, get a lock inside a task and then 
fail
-        quietly {
-          intercept[Exception] {
-            sc.makeRDD[Int](1 to 1, 1).map { i =>
-              RocksDBSuite.singleton.load(0)
-              throw new Exception("fail this task to test lock release")
-            }.count()
-          }
+    RocksDBSuite.withSingletonDB {
+      // Load a RocksDB instance, that is, get a lock inside a task and then 
fail
+      quietly {
+        intercept[Exception] {
+          sparkContext.makeRDD[Int](1 to 1, 1).map { i =>
+            RocksDBSuite.singleton.load(0)
+            throw new Exception("fail this task to test lock release")
+          }.count()
         }
-
-        // Test whether you can load again, that is, will it successfully lock 
again
-        RocksDBSuite.singleton.load(0)
       }
-    } finally {
-      sc.stop()
+
+      // Test whether you can load again, that is, will it successfully lock 
again
+      RocksDBSuite.singleton.load(0)
     }
   }
 
@@ -572,7 +828,7 @@ class RocksDBSuite extends SparkFunSuite {
     // disable resetting stats
     withTempDir { dir =>
       val remoteDir = dir.getCanonicalPath
-      withDB(remoteDir, conf = RocksDBConf().copy(resetStatsOnLoad = false)) { 
db =>
+      withDB(remoteDir, conf = dbConf.copy(resetStatsOnLoad = false)) { db =>
         verifyMetrics(putCount = 0, getCount = 0, metrics = db.metrics)
         db.load(0)
         db.put("a", "1") // put also triggers a db get
@@ -618,7 +874,7 @@ class RocksDBSuite extends SparkFunSuite {
     test(s"SPARK-39781: adding valid max_open_files=$maxOpenFiles config 
property " +
       "for RocksDB state store instance should succeed") {
       withTempDir { dir =>
-        val sqlConf = SQLConf.get
+        val sqlConf = SQLConf.get.clone()
         
sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles", 
maxOpenFiles)
         val dbConf = RocksDBConf(StateStoreConf(sqlConf))
         assert(dbConf.maxOpenFiles === maxOpenFiles.toInt)
@@ -640,7 +896,7 @@ class RocksDBSuite extends SparkFunSuite {
       "for RocksDB state store instance should fail") {
       withTempDir { dir =>
         val ex = intercept[IllegalArgumentException] {
-          val sqlConf = SQLConf.get
+          val sqlConf = SQLConf.get.clone()
           
sqlConf.setConfString("spark.sql.streaming.stateStore.rocksdb.maxOpenFiles",
             maxOpenFiles)
           val dbConf = RocksDBConf(StateStoreConf(sqlConf))
@@ -812,7 +1068,7 @@ class RocksDBSuite extends SparkFunSuite {
       var curVersion: Long = 0
       // starting with the config "trackTotalNumberOfRows = true"
       // this should track the number of rows correctly
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = 
true)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { 
db =>
         db.load(curVersion)
         db.put("a", "5")
         db.put("b", "5")
@@ -828,7 +1084,7 @@ class RocksDBSuite extends SparkFunSuite {
 
       // restart with config "trackTotalNumberOfRows = false"
       // this should reset the number of keys as -1, and keep the number as -1
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = 
false)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = false)) { 
db =>
         db.load(curVersion)
 
         assert(db.metrics.numUncommittedKeys === -1)
@@ -845,7 +1101,7 @@ class RocksDBSuite extends SparkFunSuite {
 
       // restart with config "trackTotalNumberOfRows = true" again
       // this should count the number of keys at the load phase, and continue 
tracking the number
-      withDB(remoteDir, conf = RocksDBConf().copy(trackTotalNumberOfRows = 
true)) { db =>
+      withDB(remoteDir, conf = dbConf.copy(trackTotalNumberOfRows = true)) { 
db =>
         db.load(curVersion)
 
         assert(db.metrics.numUncommittedKeys === 3)
@@ -865,10 +1121,14 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  private def sqlConf = SQLConf.get.clone()
+
+  private def dbConf = RocksDBConf(StateStoreConf(sqlConf))
+
   def withDB[T](
       remoteDir: String,
       version: Int = 0,
-      conf: RocksDBConf = RocksDBConf().copy(compactOnCommit = false, 
minVersionsToRetain = 100),
+      conf: RocksDBConf = dbConf,
       hadoopConf: Configuration = new Configuration())(
       func: RocksDB => T): T = {
     var db: RocksDB = null
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
index b189de8d2a2..b535d7e48d0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCompatibilitySuite.scala
@@ -61,7 +61,7 @@ class StateStoreCompatibilitySuite extends StreamTest with 
StateStoreCodecsTest
 }
 
 trait StateStoreCodecsTest extends SparkFunSuite with PlanTestBase {
-  private val codecsInShortName =
+  protected val codecsInShortName =
     CompressionCodec.ALL_COMPRESSION_CODECS.map { c => 
CompressionCodec.getShortName(c) }
 
   protected def testWithAllCodec(name: String)(func: => Any): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 387e96142c5..9f8a588cc32 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming._
+import 
org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
 import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
@@ -279,7 +280,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is 
unexpectedly running")
 
@@ -389,7 +390,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is 
unexpectedly running")
 
@@ -1159,59 +1160,69 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
   }
 
   test("StateStore.get") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("test")
     quietly {
-      val dir = newDir()
-      val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), 
UUID.randomUUID)
-      val storeConf = getDefaultStoreConf
-      val hadoopConf = new Configuration()
-
-      // Verify that trying to get incorrect versions throw errors
-      intercept[IllegalArgumentException] {
-        StateStore.get(
-          storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
-      }
-      assert(!StateStore.isLoaded(storeId)) // version -1 should not attempt 
to load the store
-
-      intercept[IllegalStateException] {
-        StateStore.get(
-          storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      }
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>
+        withCoordinatorRef(sc) { coordinatorRef =>
+          val dir = newDir()
+          val storeId = StateStoreProviderId(StateStoreId(dir, 0, 0), 
UUID.randomUUID)
+          val storeConf = getDefaultStoreConf
+          val hadoopConf = new Configuration()
+
+          // Verify that trying to get incorrect versions throw errors
+          intercept[IllegalArgumentException] {
+            StateStore.get(
+              storeId, keySchema, valueSchema, 0, -1, storeConf, hadoopConf)
+          }
+          assert(!StateStore.isLoaded(storeId)) // version -1 should not 
attempt to load the store
 
-      // Increase version of the store and try to get again
-      val store0 = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
-      assert(store0.version === 0)
-      put(store0, "a", 0, 1)
-      store0.commit()
-
-      val store1 = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      assert(StateStore.isLoaded(storeId))
-      assert(store1.version === 1)
-      assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1))
+          intercept[IllegalStateException] {
+            StateStore.get(
+              storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          }
 
-      // Verify that you can also load older version
-      val store0reloaded = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
-      assert(store0reloaded.version === 0)
-      assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty)
-
-      // Verify that you can remove the store and still reload and use it
-      StateStore.unload(storeId)
-      assert(!StateStore.isLoaded(storeId))
-
-      val store1reloaded = StateStore.get(
-        storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
-      assert(StateStore.isLoaded(storeId))
-      assert(store1reloaded.version === 1)
-      put(store1reloaded, "a", 0, 2)
-      assert(store1reloaded.commit() === 2)
-      assert(rowPairsToDataSet(store1reloaded.iterator()) === Set(("a", 0) -> 
2))
+          // Increase version of the store and try to get again
+          val store0 = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
+          assert(store0.version === 0)
+          put(store0, "a", 0, 1)
+          store0.commit()
+
+          val store1 = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          assert(StateStore.isLoaded(storeId))
+          assert(store1.version === 1)
+          assert(rowPairsToDataSet(store1.iterator()) === Set(("a", 0) -> 1))
+
+          // Verify that you can also load older version
+          val store0reloaded = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 0, storeConf, hadoopConf)
+          assert(store0reloaded.version === 0)
+          assert(rowPairsToDataSet(store0reloaded.iterator()) === Set.empty)
+
+          // Verify that you can remove the store and still reload and use it
+          StateStore.unload(storeId)
+          assert(!StateStore.isLoaded(storeId))
+
+          val store1reloaded = StateStore.get(
+            storeId, keySchema, valueSchema, 0, 1, storeConf, hadoopConf)
+          assert(StateStore.isLoaded(storeId))
+          assert(store1reloaded.version === 1)
+          put(store1reloaded, "a", 0, 2)
+          assert(store1reloaded.commit() === 2)
+          assert(rowPairsToDataSet(store1reloaded.iterator()) === Set(("a", 0) 
-> 2))
+        }
+      }
     }
   }
 
   test("reports memory usage") {
-    tryWithProviderResource(newStoreProvider()) { provider =>
+    // RocksDB metrics is only guaranteed to update when snapshot is created, 
so we set
+    // minDeltasForSnapshot = 1 to enable snapshot generation here.
+    tryWithProviderResource(newStoreProvider(minDeltasForSnapshot = 1,
+      numOfVersToRetainInMemory = 1)) { provider =>
       val store = provider.getStore(0)
       val noDataMemoryUsed = store.metrics.memoryUsedBytes
       put(store, "a", 0, 1)
@@ -1262,7 +1273,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     assert(metricNew.name === "m1", "incorrect name in copied instance")
 
     val conf = new SparkConf().setMaster("local").setAppName("SPARK-35763")
-    withSpark(new SparkContext(conf)) { sc =>
+    withSpark(SparkContext.getOrCreate(conf)) { sc =>
       val sqlMetric = metric.createSQLMetric(sc)
       assert(sqlMetric != null)
       assert(sqlMetric.name === Some("desc1"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 49f4214ac1a..9be699a17d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -1101,3 +1101,6 @@ object FlatMapGroupsWithStateSuite {
     throw new TestFailedException("Could get watermark when not expected", 20)
   }
 }
+
+class RocksDBStateStoreFlatMapGroupsWithStateSuite
+  extends FlatMapGroupsWithStateSuite with RocksDBStateStoreTest
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala
new file mode 100644
index 00000000000..4c73dd328b8
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, 
RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+trait RocksDBStateStoreTest extends SQLTestUtils {
+
+  val rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +
+    ".changelogCheckpointing.enabled"
+
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName + " (RocksDBStateStore)", testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (RocksDBStateStore with changelog checkpointing)", 
testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 09a0d969459..4ea59fe7405 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -956,3 +956,6 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
     }
   }
 }
+
+class RocksDBStateStoreStreamingAggregationSuite
+  extends StreamingAggregationSuite with RocksDBStateStoreTest
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
index 8607de38942..4c2a889c68d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala
@@ -483,3 +483,6 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     )
   }
 }
+
+class RocksDBStateStoreStreamingDeduplicationSuite
+  extends StreamingDeduplicationSuite with RocksDBStateStoreTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to