siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1872195109
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
this
}
+ /**
+ * Initialize in memory values based on the metadata loaded from DFS.
Review Comment:
This is too vague. We should be more concrete. We should be explicit what it
does. It is to open local RocksDB and initialize number of key metrics?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +346,51 @@ class RocksDBFileManager(
}
}
+ /**
+ * Based on the ground truth lineage loaded from changelog file (lineage),
this function
+ * does file listing to find all snapshot (version, uniqueId) pairs, and
finds
+ * the ground truth latest snapshot (version, uniqueId) the db instance
needs to load.
+ *
+ * @param lineage the ground truth lineage loaded from changelog file
+ * @return the ground truth latest snapshot (version, uniqueId) the db
instance needs to load
Review Comment:
Mention that it is sorted by what order and what does mean if the return
value is None.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+ /**
+ * Read the lineage from the changelog files. It first get the changelog
reader
+ * of the correct changelog version and then read the lineage information
from the file.
+ * The changelog file is named as version_stateStoreCkptId.changelog
+ * @param version version of the changelog file, used to load changelog file.
+ * @param stateStoreCkptId uniqueId of the changelog file, used to load
changelog file.
+ * @return
+ */
+ private def getLineageFromChangelogFile(
+ version: Long,
+ stateStoreCkptId: Option[String]): Array[LineageItem] = {
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Array[LineageItem] = Array.empty
+ try {
+ changelogReader = fileManager.getChangelogReader(version,
stateStoreCkptId)
+ currLineage = changelogReader.lineage
+ logInfo(log"Loading lineage: " +
+ log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+ log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+ log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+ } finally {
+ if (changelogReader != null) {
+ changelogReader.closeIfNeeded()
+ }
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(
+ private def loadV2(
+ version: Long,
+ stateStoreCkptId: Option[String],
+ readOnly: Boolean = false): RocksDB = {
+ // An array contains lineage information from [snapShotVersion, version]
(inclusive both end)
+ var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
Review Comment:
Can it be moved inside the if below?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -965,7 +966,8 @@ class MicroBatchExecution(
updateStateStoreCkptId(execCtx, latestExecPlan)
}
execCtx.reportTimeTaken("commitOffsets") {
- if (!commitLog.add(execCtx.batchId,
CommitMetadata(watermarkTracker.currentWatermark))) {
+ if (!commitLog.add(execCtx.batchId,
+ CommitMetadata(watermarkTracker.currentWatermark,
currentStateStoreCkptId.toMap))) {
Review Comment:
@WweiL there must be a misunderstanding here. My previous comment
> Same as commented above. It is better to fill a None if it is it is V1.
> Did you validate that for V1, the commit log generated can be read by an
older version?
is two different comments. Even if validation for V1 is done, I still hope
None is used for V1, and I've explained it above. See my explanation above
https://github.com/apache/spark/pull/48355/files#r1870082653
I didn't see it addressed yet.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1208,40 @@ class RocksDB(
Option(acquiredThreadInfo).map(_.copy())
}
+ /** Upload the snapshot to DFS and remove it from snapshots pending */
+ private def uploadSnapshot(
+ snapshot: RocksDB#RocksDBSnapshot,
+ fileManager: RocksDBFileManager,
+ snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+ loggingId: String): RocksDBFileManagerMetrics = {
+ var fileManagerMetrics: RocksDBFileManagerMetrics = null
+ try {
+ val uploadTime = timeTakenMs {
+ fileManager.saveCheckpointToDfs(snapshot.checkpointDir,
+ snapshot.version, snapshot.numKeys, snapshot.fileMapping,
+ Some(snapshot.columnFamilyMapping),
Some(snapshot.maxColumnFamilyId), snapshot.uniqueId)
+ fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
+
+ val snapshotInfo = RocksDBVersionSnapshotInfo(snapshot.version,
snapshot.dfsFileSuffix)
+ // We are only removing the uploaded snapshot info from the pending
set,
+ // to let the file mapping (i.e. query threads) know that the snapshot
(i.e. and its files)
+ // have been uploaded to DFS. We don't touch the file mapping here to
avoid corrupting it.
+ snapshotsPendingUpload.remove(snapshotInfo)
+ }
+ lineageManager.resetLineage(lineageManager.getLineage()
+ .filter(i => i.version >= snapshot.version))
Review Comment:
This is a relatively aggressive cleanup strategy. It is because that even if
the uploading succeeds, it is not necessarily the one written to the commit
log. Lineage is relatively cheap, I would keep it for longer if possible,
perhaps even keeping it until versions are cleaned up by the cleanup
maintenance task. But here is not wrong, as we can always re-read from
changelog files. Maybe add a comment for the rational here.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -331,20 +346,51 @@ class RocksDBFileManager(
}
}
+ /**
+ * Based on the ground truth lineage loaded from changelog file (lineage),
this function
+ * does file listing to find all snapshot (version, uniqueId) pairs, and
finds
+ * the ground truth latest snapshot (version, uniqueId) the db instance
needs to load.
+ *
+ * @param lineage the ground truth lineage loaded from changelog file
+ * @return the ground truth latest snapshot (version, uniqueId) the db
instance needs to load
+ */
+ def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ lineage: Array[LineageItem]): Option[(Long, Option[String])] = {
+ val path = new Path(dfsRootDir)
+ if (fm.exists(path)) {
+ fm.list(path, onlyZipFiles)
+ .map(_.getPath.getName.stripSuffix(".zip").split("_"))
+ .filter {
+ case Array(ver, id) => lineage.contains(LineageItem(ver.toLong, id))
+ }
+ .map {
+ case Array(version, uniqueId) => (version.toLong, Option(uniqueId))
+ }
+ .sortBy(_._1)
+ .reverse
+ .headOption
+ } else {
+ Some(0, None)
Review Comment:
What does (0, None) mean? If it means that the linage starts from 1 and we
don't have any snapshot uploaded yet, we should use None to indicate that, and
the return value should be `Option[(Long, String)]. (0, None) is not consistent
with how we deal with checkpointID. A checkpointID None usually means it is V1
so checkpointID is not available.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+ /**
+ * Read the lineage from the changelog files. It first get the changelog
reader
+ * of the correct changelog version and then read the lineage information
from the file.
+ * The changelog file is named as version_stateStoreCkptId.changelog
+ * @param version version of the changelog file, used to load changelog file.
+ * @param stateStoreCkptId uniqueId of the changelog file, used to load
changelog file.
+ * @return
+ */
+ private def getLineageFromChangelogFile(
+ version: Long,
+ stateStoreCkptId: Option[String]): Array[LineageItem] = {
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Array[LineageItem] = Array.empty
+ try {
+ changelogReader = fileManager.getChangelogReader(version,
stateStoreCkptId)
+ currLineage = changelogReader.lineage
+ logInfo(log"Loading lineage: " +
+ log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+ log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+ log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+ } finally {
+ if (changelogReader != null) {
+ changelogReader.closeIfNeeded()
+ }
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(
+ private def loadV2(
+ version: Long,
+ stateStoreCkptId: Option[String],
+ readOnly: Boolean = false): RocksDB = {
+ // An array contains lineage information from [snapShotVersion, version]
(inclusive both end)
+ var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
+ try {
+ if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+ stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+ closeDB(ignoreException = false)
+
+ val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+ // Special handling when version is 0.
+ // When loading the very first version (0), stateStoreCkptId does not
need to be defined
+ // because there won't be 0.changelog / 0.zip file created in RocksDB
under v2.
+ if (version == 0) {
+ assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty
when version is zero")
+ (0L, None)
+ // When there is a snapshot file, it is the ground truth, we can skip
+ // reconstructing the lineage from changelog file.
+ } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+ currVersionLineage = Array(LineageItem(version,
stateStoreCkptId.get))
+ lineageManager.resetLineage(currVersionLineage)
+ (version, stateStoreCkptId)
+ } else {
+ currVersionLineage = getLineageFromChangelogFile(version,
stateStoreCkptId) :+
+ LineageItem(version, stateStoreCkptId.get)
+ currVersionLineage = currVersionLineage.sortBy(_.version)
+ lineageManager.resetLineage(currVersionLineage)
+
+ val latestSnapshotVersionsAndUniqueId =
+
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+ latestSnapshotVersionsAndUniqueId match {
+ case Some(pair) => pair
+ case None =>
+ logWarning("Cannot find latest snapshot based on lineage: "
+ + printLineageItems(currVersionLineage))
+ (0L, None)
Review Comment:
The handling looks OK but can you explain why `cannot_load_base_snapshot` is
removed? We still need to throw it if none of 1-4 is applicable?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
this
}
+ /**
+ * Initialize in memory values based on the metadata loaded from DFS.
+ * @param metadata: metadata loaded from DFS
+ */
+ private def init(metadata: RocksDBCheckpointMetadata): Unit = {
+
+ setInitialCFInfo()
+ metadata.columnFamilyMapping.foreach { mapping =>
+ colFamilyNameToIdMap.putAll(mapping.asJava)
+ }
+
+ metadata.maxColumnFamilyId.foreach { maxId =>
+ maxColumnFamilyId.set(maxId)
+ }
+ openDB()
+ numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
+ // we don't track the total number of rows - discard the number being
track
+ -1L
+ } else if (metadata.numKeys < 0) {
+ // we track the total number of rows, but the snapshot doesn't have
tracking number
+ // need to count keys now
+ countKeys()
+ } else {
+ metadata.numKeys
+ }
+ }
+
+ def load(
+ version: Long,
+ stateStoreCkptId: Option[String] = None,
+ readOnly: Boolean = false): RocksDB = {
+ assert(version >= 0)
+ acquire(LoadStore)
+ recordedMetrics = None
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+ if (enableStateStoreCheckpointIds) {
Review Comment:
Should be when stateStoreCkptId is non-empty, rather than
`enableStateStoreCheckpointIds`. No matter whether we support V1->V2
compatibility or not, the source of truth of what we can load is from
`stateStoreCkptId` loaded from the commit log, not the configuration, which is
more about what version to be rewritten, nor read. Even if we don't support
V1->V2, we still should use `stateStoreCkptId` but add check and explicitly
fail the query, saying it is not supported. This is perhaps already done in
driver?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +471,50 @@ class RocksDB(
this
}
+ /**
+ * Initialize in memory values based on the metadata loaded from DFS.
+ * @param metadata: metadata loaded from DFS
Review Comment:
Personally, I don't like comment that provides little information. Either
makes it more useful or delete it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,159 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+ /**
+ * Read the lineage from the changelog files. It first get the changelog
reader
+ * of the correct changelog version and then read the lineage information
from the file.
+ * The changelog file is named as version_stateStoreCkptId.changelog
+ * @param version version of the changelog file, used to load changelog file.
+ * @param stateStoreCkptId uniqueId of the changelog file, used to load
changelog file.
+ * @return
+ */
+ private def getLineageFromChangelogFile(
+ version: Long,
+ stateStoreCkptId: Option[String]): Array[LineageItem] = {
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Array[LineageItem] = Array.empty
+ try {
+ changelogReader = fileManager.getChangelogReader(version,
stateStoreCkptId)
+ currLineage = changelogReader.lineage
+ logInfo(log"Loading lineage: " +
+ log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+ log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+ log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+ } finally {
+ if (changelogReader != null) {
+ changelogReader.closeIfNeeded()
+ }
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(
+ private def loadV2(
+ version: Long,
+ stateStoreCkptId: Option[String],
+ readOnly: Boolean = false): RocksDB = {
+ // An array contains lineage information from [snapShotVersion, version]
(inclusive both end)
+ var currVersionLineage: Array[LineageItem] = lineageManager.getLineage()
+ try {
+ if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+ stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+ closeDB(ignoreException = false)
+
+ val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+ // Special handling when version is 0.
+ // When loading the very first version (0), stateStoreCkptId does not
need to be defined
+ // because there won't be 0.changelog / 0.zip file created in RocksDB
under v2.
+ if (version == 0) {
+ assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty
when version is zero")
+ (0L, None)
+ // When there is a snapshot file, it is the ground truth, we can skip
+ // reconstructing the lineage from changelog file.
+ } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+ currVersionLineage = Array(LineageItem(version,
stateStoreCkptId.get))
+ lineageManager.resetLineage(currVersionLineage)
+ (version, stateStoreCkptId)
+ } else {
+ currVersionLineage = getLineageFromChangelogFile(version,
stateStoreCkptId) :+
+ LineageItem(version, stateStoreCkptId.get)
+ currVersionLineage = currVersionLineage.sortBy(_.version)
+ lineageManager.resetLineage(currVersionLineage)
+
+ val latestSnapshotVersionsAndUniqueId =
+
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+ latestSnapshotVersionsAndUniqueId match {
+ case Some(pair) => pair
+ case None =>
+ logWarning("Cannot find latest snapshot based on lineage: "
+ + printLineageItems(currVersionLineage))
+ (0L, None)
Review Comment:
version 0 should only be used if changelog lineage covers version 1. If it
doesn't, we should fail the load.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1036,6 +1184,39 @@ class RocksDB(
Option(acquiredThreadInfo).map(_.copy())
}
+ /** Upload the snapshot to DFS and remove it from snapshots pending */
+ private def uploadSnapshot(
+ snapshot: RocksDB#RocksDBSnapshot,
+ fileManager: RocksDBFileManager,
+ snapshotsPendingUpload: Set[RocksDBVersionSnapshotInfo],
+ loggingId: String): RocksDBFileManagerMetrics = {
Review Comment:
@WweiL Oh I got it. It now needs to access `lineageManager`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]