xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r643853022



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
 
+import scala.collection.JavaConverters._
 import scala.collection.Seq
 
 import com.fasterxml.jackson.annotation.JsonInclude.Include
 import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.{DefaultScalaModule, 
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to 
DFS.
+ * Every version checkpoint is saved in specific directory structure that 
allows successive
+ * versions to reuse to SST data files and archived log files. This allows 
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB 
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The 
most important among
+ * them are the SST files; they are the actual log structured data files. Rest 
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the 
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working 
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files 
have to be copied to
+ * DFS in shared directory such that different committed versions can save 
them.
+ *
+ * We consider both SST files and archived log files as immutable files which 
can be shared between
+ * different checkpoints.
+ *
+ *    localCheckpointDir
+ *                  |
+ *                  +-- OPTIONS-000005
+ *                  +-- MANIFEST-000008
+ *                  +-- CURRENT
+ *                  +-- 00007.sst
+ *                  +-- 00011.sst
+ *                  +-- archive
+ *                  |     +-- 00008.log
+ *                  |     +-- 00013.log
+ *                  ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to 
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the 
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped 
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ *    dfsRootDir
+ *           |
+ *           +-- SSTs
+ *           |      +-- 00007-[uuid1].sst
+ *           |      +-- 00011-[uuid2].sst
+ *           +-- logs
+ *           |      +-- 00008-[uuid3].log
+ *           |      +-- 00013-[uuid4].log
+ *           +-- 10.zip
+ *           |      +-- metadata         <--- contains mapping between 
00007.sst and [uuid1].sst,
+ *                                            and the mapping between 
00008.log and [uuid3].log
+ *           |      +-- OPTIONS-000005
+ *           |      +-- MANIFEST-000008
+ *           |      +-- CURRENT
+ *           |      ...
+ *           |
+ *           +-- 9.zip
+ *           +-- 8.zip
+ *           ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata 
needed to recover

Review comment:
       Make sense, add the comment in 3800c51

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -54,4 +128,31 @@ class RocksDBSuite extends SparkFunSuite {
       
"""{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""")
     // scalastyle:on line.size.limit
   }
+

Review comment:
       Thanks, done in 3800c51




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to