HeartSaVioR commented on code in PR #41099:
URL: https://github.com/apache/spark/pull/41099#discussion_r1205088664


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,

Review Comment:
   nit: style for two lines are not very clear, but longer lines for better fit 
to style is OK.
   
   ```
   class StateStoreChangelogWriter(
       fm: CheckpointFileManager,
       file: Path,
       compressionCode: CompressionCode) extends Logging {
   ```



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark 
Structured Streaming deplo
 You can also determine the max allowed memory for RocksDB instances by setting 
the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static 
number or as a fraction of the physical memory available on the node.
 Limits for individual RocksDB instances can also be configured by setting 
`spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and 
`spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required 
values. By default, RocksDB internal defaults are used for these settings.
 
+##### RocksDB State Store Changelog Checkpointing

Review Comment:
   Can we provide higher-level of description how this works? We even don't 
explain what does changelog means.
   
   I understand we have no explanation for changelog checkpointing for HDFS 
backed state store provider which is unfortunate, but for RocksDB state store 
provider, users have to make a decision whether to use old one (incremental 
checkpointing) or new one, which requires them to understand the 
characteristics of two options before choosing one.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala:
##########
@@ -243,6 +243,7 @@ class FileSystemBasedCheckpointFileManager(path: Path, 
hadoopConf: Configuration
   }
 
   override def createTempFile(path: Path): FSDataOutputStream = {
+    fs.create(path, true, 4096)

Review Comment:
   looks to be accidentally added?



##########
docs/structured-streaming-programming-guide.md:
##########
@@ -2389,6 +2394,12 @@ If you want to cap RocksDB memory usage in your Spark 
Structured Streaming deplo
 You can also determine the max allowed memory for RocksDB instances by setting 
the `spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB` value to a static 
number or as a fraction of the physical memory available on the node.
 Limits for individual RocksDB instances can also be configured by setting 
`spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB` and 
`spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber` to the required 
values. By default, RocksDB internal defaults are used for these settings.
 
+##### RocksDB State Store Changelog Checkpointing
+Changelog checkpointing reduces latency of the stateful streaming query. This 
checkpointing mechanism avoids cost of capturing and uploading snapshots of 
RocksDB instances in the commit phase of RocksDB state store.
+You can enable RocksDB State Store changelog checkpointing by setting 
`spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled` config 
to `true`.
+Changelog checkpointing is backward compatible. In a version of spark that 
supports changelog checkpointing, you can turn on changelog checkpointing for a 
streaming query without discarding the existing checkpoint. 

Review Comment:
   Shall we describe this based on RocksDB state store provider's point of 
view? Something along the line below:
   
   > RocksDB state store provider supports transition of checkpoint format 
smoothly. You can switch from traditional incremental checkpointing to 
changelog checkpointing, and vice versa. That said, you can switch to changelog 
checkpointing for existing streaming query made in older Spark versions.
   
   I'm not a native of English so someone may want to help refining the 
sentence.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends 
Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,
+                      compressionCodec: CompressionCodec)
+  extends Iterator[ByteArrayPair] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream 
= {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $this: $fileToRead does not 
exist. " +

Review Comment:
   This class is not a case class. Which information you'd want to put in 
`$this`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends 
Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,
+                      compressionCodec: CompressionCodec)
+  extends Iterator[ByteArrayPair] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream 
= {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $this: $fileToRead does not 
exist. " +
+          s"If the stream job is restarted with a new or updated state 
operation, please" +
+          s" create a new checkpoint location or clear the existing checkpoint 
location.", f)
+  }
+  private val input: DataInputStream = decompressStream(sourceStream)
+  // A buffer that hold the next record to return.
+  private var byteArrayPair: ByteArrayPair = null
+  private var eof = false
+
+  override def hasNext: Boolean = {
+    maybeReadNext()
+    byteArrayPair != null
+  }
+
+  override def next(): ByteArrayPair = {
+    maybeReadNext()
+    val nextByteArrayPair = byteArrayPair
+    byteArrayPair = null
+    nextByteArrayPair
+  }
+
+  def close(): Unit = { if (input != null) input.close() }
+
+  private def maybeReadNext(): Unit = {
+    if (!eof && byteArrayPair == null) {
+      val keySize = input.readInt()
+      // A -1 key size mean end of file.
+      if (keySize == -1) {
+        eof = true
+      } else if (keySize < 0) {
+        throw new IOException(
+          s"Error reading streaming state file $fileToRead of $this: key size 
cannot be $keySize")

Review Comment:
   Same: what would be the actual print of $this?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, 
RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+trait RocksDBStateStoreTest extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName + " (RocksDBStateStore)", testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (RocksDBStateStore with changelog checkpointing)", 
testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+

Review Comment:
   nit: extra empty line



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala:
##########
@@ -1101,3 +1101,7 @@ object FlatMapGroupsWithStateSuite {
     throw new TestFailedException("Could get watermark when not expected", 20)
   }
 }
+/*

Review Comment:
   I guess this is temporary and we uncomment this before merging.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,57 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
     }
   }
 }
+
+class RocksDBStateStoreStreamingAggregationSuite
+  extends StreamingAggregationSuite with RocksDBStateStoreTest  {
+  import testImplicits._
+  def snapshotVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".zip"))
+      .map(_.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+  }
+  def changelogVersionsPresent(dir: File): Seq[Long] = {

Review Comment:
   nit: empty line between two methods. apply to all code changes.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,57 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
     }
   }
 }
+
+class RocksDBStateStoreStreamingAggregationSuite
+  extends StreamingAggregationSuite with RocksDBStateStoreTest  {
+  import testImplicits._
+  def snapshotVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".zip"))
+      .map(_.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+  }
+  def changelogVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".changelog"))
+      .map(_.getName.stripSuffix(".changelog"))
+      .map(_.toLong)
+      .sorted
+  }
+  test("Streaming aggregation RocksDB State Store backward compatibility.") {
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    checkpointDir.delete()
+
+    val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0")

Review Comment:
   nit: `dirForPartition0`, to be super clear



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -279,7 +280,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>

Review Comment:
   1. SparkContext.getOrCreate() does not allow you to set the config if there 
is active SparkContext. If there is no active SparkContext then config takes 
effect. Hopefully we don't seem to set any configs so we don't mess up 
anything, but worth noting.
   2. If you stop active SparkContext (withSpark), SparkSession containing 
SparkContext will also be merely not functioning till you set the SparkContext 
again.
   
   That said, if you have a test which intends to spin a new SparkContext, it 
(test suite) should not extend shared SparkSession. There is another trait 
named LocalSparkContext which spins up and tears down a new SparkContext per 
test, but this may be also overkill for this test suite because only a few 
tests require SparkContext instance to be set.
   
   I'd say let's roll back unless your change requires SparkSession to be set.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  private def sqlConf = SQLConf.get

Review Comment:
   I don't feel like this is a good idea in terms of isolation. The reason we 
do withSQLConf is simple - we don't want to leave the modified config to be 
applied for remaining tests. Unlike the intention of using def, this will give 
the same config instance across multiple calls, because SparkSession is shared 
among test in this suite, and any modification will affect all remaining tests.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -280,34 +342,34 @@ class RocksDBFileManager(
     val path = new Path(dfsRootDir)
 
     // All versions present in DFS, sorted
-    val sortedVersions = fm.list(path, onlyZipFiles)
+    val sortedSnapshotVersions = fm.list(path, onlyZipFiles)
       .map(_.getPath.getName.stripSuffix(".zip"))
       .map(_.toLong)
       .sorted
 
     // Return if no versions generated yet
-    if (sortedVersions.isEmpty) return
+    if (sortedSnapshotVersions.isEmpty) return
 
     // Find the versions to delete
-    val maxVersionPresent = sortedVersions.last
-    val minVersionPresent = sortedVersions.head
-    val minVersionToRetain =
-      math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
-    val versionsToDelete = sortedVersions.takeWhile(_ < 
minVersionToRetain).toSet[Long]
+    val maxSnapshotVersionPresent = sortedSnapshotVersions.last
+    val minSnapshotVersionPresent = sortedSnapshotVersions.head
+
+    // In order to reconstruct numVersionsToRetain version, retain the latest 
snapshot
+    // that satisfies (version <= maxSnapshotVersionPresent - 
numVersionsToRetain + 1)
+    val minVersionToRetain = sortedSnapshotVersions

Review Comment:
   Sounds good!



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########
@@ -125,6 +132,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       put(store, "a", 0, 1)
       assert(get(store, "a", 0) === Some(1))
       assert(store.commit() === 1)
+      provider.doMaintenance()

Review Comment:
   This only ensures custom metrics, and other metrics like number of keys are 
available without explicit maintenance, right? Just to double check.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends 
Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,

Review Comment:
   nit: same here for style



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends 
Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,
+                      compressionCodec: CompressionCodec)
+  extends Iterator[ByteArrayPair] with Logging {

Review Comment:
   Have you indicated that there is a helper class NextIterator in Spark which 
does the thing you are implementing here? Just to check. Please ignore if you 
already went through and found a blocker.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala:
##########
@@ -1101,3 +1101,7 @@ object FlatMapGroupsWithStateSuite {
     throw new TestFailedException("Could get watermark when not expected", 20)
   }
 }
+/*
+class RocksDBStateStoreFlatMapGroupsWithStateSuite
+  extends FlatMapGroupsWithStateSuite with RocksDBStateStoreTest {}

Review Comment:
   nit: {} is not necessary for class with no body implementation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, 
IOException}
+
+import scala.util.control.NonFatal
+
+import com.google.common.io.ByteStreams
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FSError, Path}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import 
org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream
+
+/**
+ * Write changes to the key value state store instance to a changelog file.
+ * There are 2 types of records, put and delete.
+ * A put record is written as: | key length | key content | value length | 
value content |
+ * A delete record is written as: | key length | key content | -1 |
+ * Write an Int -1 to signal the end of file.
+ * The overall changelog format is: | put record | delete record | ... | put 
record | -1 |
+ */
+class StateStoreChangelogWriter(fm: CheckpointFileManager, file: Path,
+                                compressionCodec: CompressionCodec) extends 
Logging {
+
+  private def compressStream(outputStream: DataOutputStream): DataOutputStream 
= {
+    val compressed = compressionCodec.compressedOutputStream(outputStream)
+    new DataOutputStream(compressed)
+  }
+
+  private var backingFileStream: CancellableFSDataOutputStream =
+    fm.createAtomic(file, overwriteIfPossible = true)
+  private var compressedStream: DataOutputStream = 
compressStream(backingFileStream)
+  var size = 0
+
+  def put(key: Array[Byte], value: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    compressedStream.writeInt(value.size)
+    compressedStream.write(value)
+    size += 1
+  }
+
+  def delete(key: Array[Byte]): Unit = {
+    assert(compressedStream != null)
+    compressedStream.writeInt(key.size)
+    compressedStream.write(key)
+    // -1 in the value field means record deletion.
+    compressedStream.writeInt(-1)
+    size += 1
+  }
+
+  def abort(): Unit = {
+    try {
+      if (backingFileStream != null) backingFileStream.cancel()
+      if (compressedStream != null) IOUtils.closeQuietly(compressedStream)
+    } catch {
+      // Closing the compressedStream causes the stream to write/flush flush 
data into the
+      // rawStream. Since the rawStream is already closed, there may be errors.
+      // Usually its an IOException. However, Hadoop's RawLocalFileSystem wraps
+      // IOException into FSError.
+      case e: FSError if e.getCause.isInstanceOf[IOException] =>
+      case NonFatal(ex) =>
+        logInfo(s"Failed to cancel changelog file $file for state store 
provider " +
+          s"with exception=$ex")
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+
+  def commit(): Unit = {
+    try {
+      // -1 in the key length field mean EOF.
+      compressedStream.writeInt(-1)
+      compressedStream.close()
+    } catch {
+      case e: Throwable =>
+        abort()
+        logError(s"Fail to commit changelog file $file because of exception 
$e")
+        throw e
+    } finally {
+      backingFileStream = null
+      compressedStream = null
+    }
+  }
+}
+
+
+/**
+ * Read an iterator of change record from the changelog file.
+ * A record is represented by ByteArrayPair(key: Array[Byte], value: 
Array[Byte])
+ * A put record is returned as a ByteArrayPair(key, value)
+ * A delete record is return as a ByteArrayPair(key, null)
+ */
+class StateStoreChangelogReader(fm: CheckpointFileManager, fileToRead: Path,
+                      compressionCodec: CompressionCodec)
+  extends Iterator[ByteArrayPair] with Logging {
+
+  private def decompressStream(inputStream: DataInputStream): DataInputStream 
= {
+    val compressed = compressionCodec.compressedInputStream(inputStream)
+    new DataInputStream(compressed)
+  }
+
+  private val sourceStream = try {
+    fm.open(fileToRead)
+  } catch {
+    case f: FileNotFoundException =>
+      throw new IllegalStateException(
+        s"Error reading streaming state file of $this: $fileToRead does not 
exist. " +
+          s"If the stream job is restarted with a new or updated state 
operation, please" +

Review Comment:
   nit: s isn't needed, same as below



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/RocksDBStateStoreTest.scala:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+
+import org.apache.spark.sql.execution.streaming.state.{RocksDBConf, 
RocksDBStateStoreProvider}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+
+trait RocksDBStateStoreTest extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName + " (RocksDBStateStore)", testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (RocksDBStateStore with changelog checkpointing)", 
testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+
+
+  def rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +

Review Comment:
   nit: val



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
 
 import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+
+  def rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +

Review Comment:
   nit: val, as it never changes.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -483,3 +483,6 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
     )
   }
 }
+
+class RocksDBStateStoreStreamingDeduplicationSuite
+  extends StreamingDeduplicationSuite with RocksDBStateStoreTest  {}

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala:
##########
@@ -956,3 +956,57 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with Assertions {
     }
   }
 }
+
+class RocksDBStateStoreStreamingAggregationSuite
+  extends StreamingAggregationSuite with RocksDBStateStoreTest  {
+  import testImplicits._
+  def snapshotVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".zip"))
+      .map(_.getName.stripSuffix(".zip"))
+      .map(_.toLong)
+      .sorted
+  }
+  def changelogVersionsPresent(dir: File): Seq[Long] = {
+    dir.listFiles.filter(_.getName.endsWith(".changelog"))
+      .map(_.getName.stripSuffix(".changelog"))
+      .map(_.toLong)
+      .sorted
+  }
+  test("Streaming aggregation RocksDB State Store backward compatibility.") {
+    val checkpointDir = Utils.createTempDir().getCanonicalFile
+    checkpointDir.delete()
+
+    val rocksDBStateDir = new File(checkpointDir.getAbsolutePath, "/state/0/0")
+    val inputData = MemoryStream[Int]
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    // Run the stream with changelog checkpointing disabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> 
"false")),
+      AddData(inputData, 3),
+      CheckLastBatch((3, 1)),
+      AddData(inputData, 3, 2),
+      CheckLastBatch((3, 2), (2, 1)),
+      StopStream
+    )
+    assert(changelogVersionsPresent(rocksDBStateDir).isEmpty)
+    assert(snapshotVersionsPresent(rocksDBStateDir) == List(1L, 2L))
+
+    // Run the stream with changelog checkpointing enabled.
+    testStream(aggregated, Update)(
+      StartStream(checkpointLocation = checkpointDir.getAbsolutePath,
+        additionalConfs = Map(rocksdbChangelogCheckpointingConfKey -> "true")),
+      AddData(inputData, 3, 2, 1),
+      CheckLastBatch((3, 3), (2, 2), (1, 1)),
+      // By default we run in new tuple mode.
+      AddData(inputData, 4, 4, 4, 4),
+      CheckLastBatch((4, 4))
+    )
+    assert(changelogVersionsPresent(rocksDBStateDir) == List(3L, 4L))

Review Comment:
   Let's flip the config once again to check bi-directional migration.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -865,10 +1076,14 @@ class RocksDBSuite extends SparkFunSuite {
     }
   }
 
+  private def sqlConf = SQLConf.get
+
+  private def dbConf = RocksDBConf(StateStoreConf(sqlConf))

Review Comment:
   Same here, doesn't seem to be good in terms of isolation.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -134,6 +137,27 @@ class RocksDBFileManager(
   private val onlyZipFiles = new PathFilter {
     override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
   }
+  private val onlyDeltaFiles = new PathFilter {
+    override def accept(path: Path): Boolean = 
path.toString.endsWith(".changelog")
+  }
+
+  private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new 
SparkConf)
+
+  private def codec = CompressionCodec.createCodec(sparkConf, codecName)

Review Comment:
   Thanks for explanation. OK as it is. Not a big deal.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
 
 import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+
+  def rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +

Review Comment:
   While we are here, let's put this in top so that any reference will be 
placed after this is defined. Compiler wouldn't complain for forward reference, 
but would be helpful for readers.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -279,7 +280,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     val timeoutDuration = 1.minute
 
     quietly {
-      withSpark(new SparkContext(conf)) { sc =>
+      withSpark(SparkContext.getOrCreate(conf)) { sc =>

Review Comment:
   I feel like you had to do this because withSQLConf may require to have 
SparkSession. But I suspect this test won't use SQLConf being initialized in 
shared SparkSession, which makes withSQLConf be no-op.
   
   Have you checked that all tests are running intentionally, testing both 
incremental checkpointing and changelog checkpointing?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##########
@@ -24,14 +24,275 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.scalactic.source.Position
+import org.scalatest.Tag
 
 import org.apache.spark._
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-class RocksDBSuite extends SparkFunSuite {
+
+trait AlsoTestWithChangelogCheckpointingEnabled extends SQLTestUtils {
+  override protected def test(testName: String, testTags: Tag*)(testBody: => 
Any)
+                             (implicit pos: Position): Unit = {
+    super.test(testName, testTags: _*) {
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "false",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+      // in case tests have any code that needs to execute after every test
+      super.afterEach()
+    }
+
+    super.test(testName + " (with changelog checkpointing)", testTags: _*) {
+      // in case tests have any code that needs to execute before every test
+      super.beforeEach()
+      withSQLConf(rocksdbChangelogCheckpointingConfKey -> "true",
+        SQLConf.STATE_STORE_PROVIDER_CLASS.key -> 
classOf[RocksDBStateStoreProvider].getName) {
+        testBody
+      }
+    }
+  }
+
+  def rocksdbChangelogCheckpointingConfKey: String = 
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX +

Review Comment:
   If you want to use this as constant, let's follow the standard. Define it as 
val with all uppercases with underline as separator, and preferably, put them 
in companion object. Probably doesn't need to apply latter as it's a trait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to