xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r644675514



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,265 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.Seq
 
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to 
DFS.
+ * For each version, checkpoint is saved in specific directory structure that 
allows successive
+ * versions to reuse to SST data files and archived log files. This allows 
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB 
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The 
most important among
+ * them are the SST files; they are the actual log structured data files. Rest 
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the 
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working 
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files 
have to be copied to
+ * DFS in shared directory such that different committed versions can save 
them.
+ *
+ * We consider both SST files and archived log files as immutable files which 
can be shared between
+ * different checkpoints.
+ *
+ *    localCheckpointDir
+ *                  |
+ *                  +-- OPTIONS-000005
+ *                  +-- MANIFEST-000008
+ *                  +-- CURRENT
+ *                  +-- 00007.sst
+ *                  +-- 00011.sst
+ *                  +-- archive
+ *                  |     +-- 00008.log
+ *                  |     +-- 00013.log
+ *                  ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to 
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the 
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped 
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ *    dfsRootDir
+ *           |
+ *           +-- SSTs
+ *           |      +-- 00007-[uuid1].sst
+ *           |      +-- 00011-[uuid2].sst
+ *           +-- logs
+ *           |      +-- 00008-[uuid3].log
+ *           |      +-- 00013-[uuid4].log
+ *           +-- 10.zip
+ *           |      +-- metadata         <--- contains mapping between 
00007.sst and [uuid1].sst,
+ *                                            and the mapping between 
00008.log and [uuid3].log
+ *           |      +-- OPTIONS-000005
+ *           |      +-- MANIFEST-000008
+ *           |      +-- CURRENT
+ *           |      ...
+ *           |
+ *           +-- 9.zip
+ *           +-- 8.zip
+ *           ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata 
needed to recover
+ *   a RocksDB instance at the corresponding version. The SST files and log 
files are not included
+ *   in the zip files, they can be shared cross different versions. This is 
unlike the
+ *   [version].delta files of HDFSBackedStateStore where previous delta files 
needs to be read
+ *   to be recovered.
+ * - This is safe wrt speculatively executed tasks running concurrently in 
different executors
+ *   as each task would upload a different copy of the generated immutable 
files and
+ *   atomically update the [version].zip.
+ * - Immutable files are identified uniquely based on their file name and file 
size.
+ * - Immutable files can be reused only across adjacent checkpoints/versions.
+ * - This class is thread-safe. Specifically, it is safe to concurrently 
delete old files from a
+ *   different thread than the task thread saving files.

Review comment:
       Yes. It refers to delete the files contained in the old versions. Here's 
the description of the deleteOldVersions method of RocksDBFileManager, which 
will be called in RocksDBStateStoreProvider.doMaintenance. As we did before, 
I'll also refer this comment when the PR for delete path submitted.
   ```
      * Delete old versions by deleting the associated version and SST files.
      * At a high-level, this method finds which versions to delete, and which 
SST files that were
      * last used in those versions. Its safe to delete these SST files because 
a SST file can
      * be reused only in successive versions. Therefore, if a SST file F was 
last used in version
      * V, then it wont be used in version V+1 or later, and if version V can 
be deleted, then
      * F can safely be deleted as well.
   ```

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -1129,6 +1129,15 @@ private[spark] object Utils extends Logging {
     s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms"
   }
 
+  /**
+   * Lists files recursively.
+   */
+  def recursiveList(f: File): Array[File] = {
+    require(f.isDirectory)
+    val current = f.listFiles
+    current ++ current.filter(_.isDirectory).flatMap(recursiveList)

Review comment:
       Sure, done in f5a6104

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,265 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.Seq
 
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to 
DFS.
+ * For each version, checkpoint is saved in specific directory structure that 
allows successive
+ * versions to reuse to SST data files and archived log files. This allows 
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB 
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The 
most important among
+ * them are the SST files; they are the actual log structured data files. Rest 
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the 
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working 
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files 
have to be copied to
+ * DFS in shared directory such that different committed versions can save 
them.
+ *
+ * We consider both SST files and archived log files as immutable files which 
can be shared between
+ * different checkpoints.
+ *
+ *    localCheckpointDir
+ *                  |
+ *                  +-- OPTIONS-000005
+ *                  +-- MANIFEST-000008
+ *                  +-- CURRENT
+ *                  +-- 00007.sst
+ *                  +-- 00011.sst
+ *                  +-- archive
+ *                  |     +-- 00008.log
+ *                  |     +-- 00013.log
+ *                  ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to 
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the 
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped 
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ *    dfsRootDir
+ *           |
+ *           +-- SSTs
+ *           |      +-- 00007-[uuid1].sst
+ *           |      +-- 00011-[uuid2].sst
+ *           +-- logs
+ *           |      +-- 00008-[uuid3].log
+ *           |      +-- 00013-[uuid4].log
+ *           +-- 10.zip
+ *           |      +-- metadata         <--- contains mapping between 
00007.sst and [uuid1].sst,
+ *                                            and the mapping between 
00008.log and [uuid3].log
+ *           |      +-- OPTIONS-000005
+ *           |      +-- MANIFEST-000008
+ *           |      +-- CURRENT
+ *           |      ...
+ *           |
+ *           +-- 9.zip
+ *           +-- 8.zip
+ *           ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata 
needed to recover
+ *   a RocksDB instance at the corresponding version. The SST files and log 
files are not included
+ *   in the zip files, they can be shared cross different versions. This is 
unlike the
+ *   [version].delta files of HDFSBackedStateStore where previous delta files 
needs to be read
+ *   to be recovered.
+ * - This is safe wrt speculatively executed tasks running concurrently in 
different executors
+ *   as each task would upload a different copy of the generated immutable 
files and
+ *   atomically update the [version].zip.
+ * - Immutable files are identified uniquely based on their file name and file 
size.
+ * - Immutable files can be reused only across adjacent checkpoints/versions.
+ * - This class is thread-safe. Specifically, it is safe to concurrently 
delete old files from a
+ *   different thread than the task thread saving files.
+ *
+ * @param dfsRootDir  Directory where the [version].zip files will be stored
+ * @param localTempDir Local directory for temporary work
+ * @param hadoopConf   Hadoop configuration for talking to DFS
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDBFileManager(
+    dfsRootDir: String,
+    localTempDir: File,
+    hadoopConf: Configuration,
+    loggingId: String = "")
+  extends Logging {
+
+  import RocksDBImmutableFile._
+
+  private val versionToRocksDBFiles = new ConcurrentHashMap[Long, 
Seq[RocksDBImmutableFile]]
+  private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), 
hadoopConf)
+  private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
+
+  /** Save all the files in given local checkpoint directory as a committed 
version in DFS */
+  def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): 
Unit = {
+    logFilesInDir(checkpointDir, s"Saving checkpoint files for version 
$version")
+    val (localImmutableFiles, localOtherFiles) = 
listRocksDBFiles(checkpointDir)
+    val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
+    val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
+    val metadataFile = localMetadataFile(checkpointDir)
+    metadata.writeToFile(metadataFile)
+    logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
+
+    if (version <= 0 && numKeys == 0) {
+      // If we're writing the initial version and there's no data, we have to 
explicitly initialize
+      // the root directory. Normally saveImmutableFilesToDfs will do this 
initialization, but
+      // when there's no data that method won't write any files, and 
zipToDfsFile uses the
+      // CheckpointFileManager.createAtomic API which doesn't auto-initialize 
parent directories.
+      fm.mkdirs(new Path(dfsRootDir))

Review comment:
       Make sense, done in f5a6104

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,265 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.Seq
 
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to 
DFS.
+ * For each version, checkpoint is saved in specific directory structure that 
allows successive
+ * versions to reuse to SST data files and archived log files. This allows 
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB 
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The 
most important among
+ * them are the SST files; they are the actual log structured data files. Rest 
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the 
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working 
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files 
have to be copied to
+ * DFS in shared directory such that different committed versions can save 
them.
+ *
+ * We consider both SST files and archived log files as immutable files which 
can be shared between
+ * different checkpoints.
+ *
+ *    localCheckpointDir
+ *                  |
+ *                  +-- OPTIONS-000005
+ *                  +-- MANIFEST-000008
+ *                  +-- CURRENT
+ *                  +-- 00007.sst
+ *                  +-- 00011.sst
+ *                  +-- archive
+ *                  |     +-- 00008.log
+ *                  |     +-- 00013.log
+ *                  ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to 
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the 
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped 
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ *    dfsRootDir
+ *           |
+ *           +-- SSTs
+ *           |      +-- 00007-[uuid1].sst
+ *           |      +-- 00011-[uuid2].sst
+ *           +-- logs
+ *           |      +-- 00008-[uuid3].log
+ *           |      +-- 00013-[uuid4].log
+ *           +-- 10.zip
+ *           |      +-- metadata         <--- contains mapping between 
00007.sst and [uuid1].sst,
+ *                                            and the mapping between 
00008.log and [uuid3].log
+ *           |      +-- OPTIONS-000005
+ *           |      +-- MANIFEST-000008
+ *           |      +-- CURRENT
+ *           |      ...
+ *           |
+ *           +-- 9.zip
+ *           +-- 8.zip
+ *           ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata 
needed to recover
+ *   a RocksDB instance at the corresponding version. The SST files and log 
files are not included
+ *   in the zip files, they can be shared cross different versions. This is 
unlike the
+ *   [version].delta files of HDFSBackedStateStore where previous delta files 
needs to be read
+ *   to be recovered.
+ * - This is safe wrt speculatively executed tasks running concurrently in 
different executors
+ *   as each task would upload a different copy of the generated immutable 
files and
+ *   atomically update the [version].zip.
+ * - Immutable files are identified uniquely based on their file name and file 
size.
+ * - Immutable files can be reused only across adjacent checkpoints/versions.
+ * - This class is thread-safe. Specifically, it is safe to concurrently 
delete old files from a
+ *   different thread than the task thread saving files.
+ *
+ * @param dfsRootDir  Directory where the [version].zip files will be stored
+ * @param localTempDir Local directory for temporary work
+ * @param hadoopConf   Hadoop configuration for talking to DFS
+ * @param loggingId    Id that will be prepended in logs for isolating 
concurrent RocksDBs
+ */
+class RocksDBFileManager(
+    dfsRootDir: String,
+    localTempDir: File,
+    hadoopConf: Configuration,
+    loggingId: String = "")
+  extends Logging {
+
+  import RocksDBImmutableFile._
+
+  private val versionToRocksDBFiles = new ConcurrentHashMap[Long, 
Seq[RocksDBImmutableFile]]
+  private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), 
hadoopConf)
+  private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
+
+  /** Save all the files in given local checkpoint directory as a committed 
version in DFS */
+  def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): 
Unit = {
+    logFilesInDir(checkpointDir, s"Saving checkpoint files for version 
$version")
+    val (localImmutableFiles, localOtherFiles) = 
listRocksDBFiles(checkpointDir)
+    val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
+    val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
+    val metadataFile = localMetadataFile(checkpointDir)
+    metadata.writeToFile(metadataFile)
+    logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
+
+    if (version <= 0 && numKeys == 0) {
+      // If we're writing the initial version and there's no data, we have to 
explicitly initialize
+      // the root directory. Normally saveImmutableFilesToDfs will do this 
initialization, but
+      // when there's no data that method won't write any files, and 
zipToDfsFile uses the
+      // CheckpointFileManager.createAtomic API which doesn't auto-initialize 
parent directories.
+      fm.mkdirs(new Path(dfsRootDir))
+    }
+    zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
+    logInfo(s"Saved checkpoint file for version $version")
+  }
+
+  /** Save immutable files to DFS directory */
+  private def saveImmutableFilesToDfs(
+      version: Long,
+      localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
+    // Get the immutable files used in previous versions, as some of those 
uploaded files can be
+    // reused for this version
+    logInfo(s"Saving RocksDB files to DFS for $version")
+    val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { 
f =>
+      f.localFileName -> f
+    }.toMap
+
+    var bytesCopied = 0L
+    var filesCopied = 0L
+    var filesReused = 0L
+
+    val immutableFiles = localFiles.map { localFile =>
+      prevFilesToSizes
+        .get(localFile.getName)
+        .filter(_.isSameFile(localFile))
+        .map { reusable =>
+          filesReused += 1
+          reusable
+        }.getOrElse {
+          val localFileName = localFile.getName
+          val dfsFileName = newDFSFileName(localFileName)
+          val dfsFile = dfsFilePath(dfsFileName)
+          // Note: The implementation of copyFromLocalFile() closes the output 
stream when there is
+          // any exception while copying. So this may generate partial files 
on DFS. But that is
+          // okay because until the main [version].zip file is written, those 
partial files are
+          // not going to be used at all. Eventually these files should get 
cleared.
+          fs.copyFromLocalFile(
+            new Path(localFile.getAbsoluteFile.toURI), dfsFile)
+          val localFileSize = localFile.length()
+          logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
+          filesCopied += 1
+          bytesCopied += localFileSize
+
+          RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
+        }
+    }
+    logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
+      s" DFS for version $version. $filesReused files reused without copying.")
+    versionToRocksDBFiles.put(version, immutableFiles)
+
+    immutableFiles
+  }
+
+  /**
+   * Compress files to a single zip file in DFS. Only the file names are 
embedded in the zip.
+   * Any error while writing will ensure that the file is not written.
+   */
+  private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
+    lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
+    var in: InputStream = null
+    val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
+    var totalBytes = 0L
+    val zout = new ZipOutputStream(out)
+    try {
+      files.foreach { file =>
+        zout.putNextEntry(new ZipEntry(file.getName))
+        in = new FileInputStream(file)
+        val bytes = IOUtils.copy(in, zout)
+        in.close()
+        zout.closeEntry()
+        totalBytes += bytes
+      }
+      zout.close()  // so that any error in closing also cancels the output 
stream
+      logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")

Review comment:
       the `fileStr` contains the `dfsZipFile` name. The log format here is 
`${dfsZipFile} \n ${listing all the file names}`

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,86 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    val dfsRootDir = Utils.createTempDir().getAbsolutePath

Review comment:
       Sure, done in f5a6104

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,86 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    val dfsRootDir = Utils.createTempDir().getAbsolutePath
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir, Utils.createTempDir(), new Configuration)
+    val sstDir = s"$dfsRootDir/SSTs"
+    def numRemoteSSTFiles: Int = listFiles(sstDir).length
+    val logDir = s"$dfsRootDir/logs"
+    def numRemoteLogFiles: Int = listFiles(logDir).length

Review comment:
       Referenced this comment to 
https://github.com/apache/spark/pull/32767/files#diff-dc6f9dfe11e76f890ff2986f866853bcac263027c82562f9a52f4672a5460826R97

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,86 @@ package org.apache.spark.sql.execution.streaming.state
 import java.io._
 import java.nio.charset.Charset
 
+import scala.language.implicitConversions
+
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 class RocksDBSuite extends SparkFunSuite {
 
+  test("RocksDBFileManager: upload only new immutable files") {
+    val dfsRootDir = Utils.createTempDir().getAbsolutePath
+    val fileManager = new RocksDBFileManager(
+      dfsRootDir, Utils.createTempDir(), new Configuration)
+    val sstDir = s"$dfsRootDir/SSTs"
+    def numRemoteSSTFiles: Int = listFiles(sstDir).length
+    val logDir = s"$dfsRootDir/logs"
+    def numRemoteLogFiles: Int = listFiles(logDir).length

Review comment:
       Referenced this comment to 
https://github.com/apache/spark/pull/32767/files#diff-dc6f9dfe11e76f890ff2986f866853bcac263027c82562f9a52f4672a5460826R79




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to