xuanyuanking commented on a change in pull request #32582:
URL: https://github.com/apache/spark/pull/32582#discussion_r643850985
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,270 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+import scala.collection.JavaConverters._
import scala.collection.Seq
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule,
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to
DFS.
+ * Every version checkpoint is saved in specific directory structure that
allows successive
+ * versions to reuse to SST data files and archived log files. This allows
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The
most important among
+ * them are the SST files; they are the actual log structured data files. Rest
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files
have to be copied to
+ * DFS in shared directory such that different committed versions can save
them.
+ *
+ * We consider both SST files and archived log files as immutable files which
can be shared between
+ * different checkpoints.
+ *
+ * localCheckpointDir
+ * |
+ * +-- OPTIONS-000005
+ * +-- MANIFEST-000008
+ * +-- CURRENT
+ * +-- 00007.sst
+ * +-- 00011.sst
+ * +-- archive
+ * | +-- 00008.log
+ * | +-- 00013.log
+ * ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ * dfsRootDir
+ * |
+ * +-- SSTs
+ * | +-- 00007-[uuid1].sst
+ * | +-- 00011-[uuid2].sst
+ * +-- logs
+ * | +-- 00008-[uuid3].log
+ * | +-- 00013-[uuid4].log
+ * +-- 10.zip
+ * | +-- metadata <--- contains mapping between
00007.sst and [uuid1].sst,
+ * and the mapping between
00008.log and [uuid3].log
+ * | +-- OPTIONS-000005
+ * | +-- MANIFEST-000008
+ * | +-- CURRENT
+ * | ...
+ * |
+ * +-- 9.zip
+ * +-- 8.zip
+ * ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata
needed to recover
+ * a RocksDB instance at the corresponding version. This is unlike the
[version].delta files of
+ * HDFSBackedStateStore where previous delta files needs to be read to be
recovered.
+ * - This is safe wrt speculatively executed tasks running concurrently in
different executors
+ * as each task would upload a different copy of the generated immutable
files and
+ * atomically update the [version].zip.
+ * - Immutable files are identified uniquely based on their file name and file
size.
+ * - Immutable files can be reused only across adjacent checkpoints/versions.
+ * - This class is thread-safe. Specifically, it is safe to concurrently
delete old files from a
+ * different thread than the task thread saving files.
+ *
+ * @param dfsRootDir Directory where the [version].zip files will be stored
+ * @param localTempDir Local directory for temporary work
+ * @param hadoopConf Hadoop configuration for talking to DFS
+ * @param loggingId Id that will be prepended in logs for isolating
concurrent RocksDBs
+ */
+class RocksDBFileManager(
+ dfsRootDir: String,
+ localTempDir: File,
+ hadoopConf: Configuration,
+ loggingId: String = "")
+ extends Logging {
+
+ import RocksDBImmutableFile._
+
+ private val versionToRocksDBFiles = new ConcurrentHashMap[Long,
Seq[RocksDBImmutableFile]]
Review comment:
That's right. `versionToRocksDBFiles` was touched in the following 3
places:
- saveCheckpointToDfs (this PR)
- deleteOldVersions
- loadCheckpointFromDfs
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
##########
@@ -17,18 +17,270 @@
package org.apache.spark.sql.execution.streaming.state
-import java.io.File
+import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+import java.util.zip.{ZipEntry, ZipOutputStream}
+import scala.collection.JavaConverters._
import scala.collection.Seq
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule,
ScalaObjectMapper}
+import org.apache.commons.io.{FilenameUtils, IOUtils}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.execution.streaming.CheckpointFileManager
+import org.apache.spark.util.Utils
+
+/**
+ * Class responsible for syncing RocksDB checkpoint files from local disk to
DFS.
+ * Every version checkpoint is saved in specific directory structure that
allows successive
+ * versions to reuse to SST data files and archived log files. This allows
each commit to be
+ * incremental, only new SST files and archived log files generated by RocksDB
will be uploaded.
+ * The directory structures on local disk and in DFS are as follows.
+ *
+ * Local checkpoint dir structure
+ * ------------------------------
+ * RocksDB generates a bunch of files in the local checkpoint directory. The
most important among
+ * them are the SST files; they are the actual log structured data files. Rest
of the files contain
+ * the metadata necessary for RocksDB to read the SST files and start from the
checkpoint.
+ * Note that the SST files are hard links to files in the RocksDB's working
directory, and therefore
+ * successive checkpoints can share some of the SST files. So these SST files
have to be copied to
+ * DFS in shared directory such that different committed versions can save
them.
+ *
+ * We consider both SST files and archived log files as immutable files which
can be shared between
+ * different checkpoints.
+ *
+ * localCheckpointDir
+ * |
+ * +-- OPTIONS-000005
+ * +-- MANIFEST-000008
+ * +-- CURRENT
+ * +-- 00007.sst
+ * +-- 00011.sst
+ * +-- archive
+ * | +-- 00008.log
+ * | +-- 00013.log
+ * ...
+ *
+ *
+ * DFS directory structure after saving to DFS as version 10
+ * -----------------------------------------------------------
+ * The SST and archived log files are given unique file names and copied to
the shared subdirectory.
+ * Every version maintains a mapping of local immutable file name to the
unique file name in DFS.
+ * This mapping is saved in a JSON file (named `metadata`), which is zipped
along with other
+ * checkpoint files into a single file `[version].zip`.
+ *
+ * dfsRootDir
+ * |
+ * +-- SSTs
+ * | +-- 00007-[uuid1].sst
+ * | +-- 00011-[uuid2].sst
+ * +-- logs
+ * | +-- 00008-[uuid3].log
+ * | +-- 00013-[uuid4].log
+ * +-- 10.zip
+ * | +-- metadata <--- contains mapping between
00007.sst and [uuid1].sst,
+ * and the mapping between
00008.log and [uuid3].log
+ * | +-- OPTIONS-000005
+ * | +-- MANIFEST-000008
+ * | +-- CURRENT
+ * | ...
+ * |
+ * +-- 9.zip
+ * +-- 8.zip
+ * ...
+ *
+ * Note the following.
+ * - Each [version].zip is a complete description of all the data and metadata
needed to recover
+ * a RocksDB instance at the corresponding version. This is unlike the
[version].delta files of
+ * HDFSBackedStateStore where previous delta files needs to be read to be
recovered.
+ * - This is safe wrt speculatively executed tasks running concurrently in
different executors
+ * as each task would upload a different copy of the generated immutable
files and
+ * atomically update the [version].zip.
+ * - Immutable files are identified uniquely based on their file name and file
size.
+ * - Immutable files can be reused only across adjacent checkpoints/versions.
+ * - This class is thread-safe. Specifically, it is safe to concurrently
delete old files from a
+ * different thread than the task thread saving files.
+ *
+ * @param dfsRootDir Directory where the [version].zip files will be stored
+ * @param localTempDir Local directory for temporary work
+ * @param hadoopConf Hadoop configuration for talking to DFS
+ * @param loggingId Id that will be prepended in logs for isolating
concurrent RocksDBs
+ */
+class RocksDBFileManager(
+ dfsRootDir: String,
+ localTempDir: File,
+ hadoopConf: Configuration,
+ loggingId: String = "")
+ extends Logging {
+
+ import RocksDBImmutableFile._
+
+ private val versionToRocksDBFiles = new ConcurrentHashMap[Long,
Seq[RocksDBImmutableFile]]
+ private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir),
hadoopConf)
+ private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
+
+ /** Save all the files in given local checkpoint directory as a committed
version in DFS */
+ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long):
Unit = {
+ logFilesInDir(checkpointDir, s"Saving checkpoint files for version
$version")
+ val allCheckpointFiles = listRocksDBFiles(checkpointDir)
+ val (localImmutableFiles, localOtherFiles) =
allCheckpointFiles.partition(isImmutableFile)
+ val rocksDBFiles = saveImmutableFilesToDfs(version, checkpointDir,
localImmutableFiles)
+ val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
+ rememberImmutableFiles(version, rocksDBFiles)
+ val metadataFile = localMetadataFile(checkpointDir)
+ metadata.writeToFile(metadataFile)
+ logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
+
+ if (version <= 0 && numKeys == 0) {
Review comment:
We also use batch ID + 1 as the version for the RocksDB state store.
Since the batch ID starts from -1 so for the corner case of the first empty
batch (batch ID 0), we add the safeguard here to make sure the working dir is
created.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -54,4 +128,31 @@ class RocksDBSuite extends SparkFunSuite {
"""{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""")
// scalastyle:on line.size.limit
}
+
+
+ def generateFiles(dir: String, fileToLengths: Seq[(String, Int)]): Unit = {
+ fileToLengths.foreach { case (fileName, length) =>
+ val file = new File(dir, fileName)
+ FileUtils.write(file, "a" * length)
+ }
+ }
+
+ def saveCheckpointFiles(
+ fileManager: RocksDBFileManager,
+ fileToLengths: Seq[(String, Int)],
+ version: Int,
+ numKeys: Int): Unit = {
+ val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to
create checkpoints
+ generateFiles(checkpointDir, fileToLengths)
+ fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys)
+ }
+
+ implicit def toFile(path: String): File = new File(path)
Review comment:
That's right in this PR. In the further PRs, we will have more caller
for the `listFiles(file: String)`. Maybe let's keep it for now.
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
##########
@@ -20,12 +20,86 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset
+import scala.language.implicitConversions
+
import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
import org.apache.spark._
+import org.apache.spark.sql.catalyst.util.quietly
+import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
class RocksDBSuite extends SparkFunSuite {
+ test("RocksDBFileManager: upload only new immutable files") {
+ val dfsRootDir = Utils.createTempDir().getAbsolutePath
+ val fileManager = new RocksDBFileManager(
+ dfsRootDir, Utils.createTempDir(), new Configuration)
+ val sstDir = s"$dfsRootDir/SSTs"
+ def numRemoteSSTFiles: Int = listFiles(sstDir).length
+ val logDir = s"$dfsRootDir/logs"
+ def numRemoteLogFiles: Int = listFiles(logDir).length
Review comment:
We'll enhance this test in the next PR, `RocksDBFileManager - load
checkpoint from DFS`, by load back the metadata and check the other files. Let
me keep this conversation open and be referenced in the next PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]