siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1876691995
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -431,16 +593,18 @@ class RocksDB(
/**
* Replay change log from the loaded version to the target version.
*/
- private def replayChangelog(endVersion: Long): Unit = {
+ private def replayChangelog(versionsAndUniqueIds: Array[(Long,
Option[String])]): Unit = {
logInfo(log"Replaying changelog from version " +
log"${MDC(LogKeys.LOADED_VERSION, loadedVersion)} -> " +
- log"${MDC(LogKeys.END_VERSION, endVersion)}")
- for (v <- loadedVersion + 1 to endVersion) {
- logInfo(log"Replaying changelog on version " +
- log"${MDC(LogKeys.VERSION_NUM, v)}")
+ log"${MDC(LogKeys.END_VERSION,
versionsAndUniqueIds.lastOption.map(_._1))}")
Review Comment:
Should assert here that the first item of the lineage has version
`loadedVersion`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +473,49 @@ class RocksDB(
this
}
+ /**
+ * Initialize key metrics based on the metadata loaded from DFS and open
local RocksDB.
+ */
+ private def init(metadata: RocksDBCheckpointMetadata): Unit = {
Review Comment:
Probably better call it `openLocalRocksDB()` or something like that.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,27 +280,161 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+ /**
+ * Read the lineage from the changelog files. It first get the changelog
reader
+ * of the correct changelog version and then read the lineage information
from the file.
+ * The changelog file is named as version_stateStoreCkptId.changelog
+ * @param version version of the changelog file, used to load changelog file.
+ * @param stateStoreCkptId uniqueId of the changelog file, used to load
changelog file.
+ * @return
+ */
+ private def getLineageFromChangelogFile(
+ version: Long,
+ stateStoreCkptId: Option[String]): Array[LineageItem] = {
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Array[LineageItem] = Array.empty
+ try {
+ changelogReader = fileManager.getChangelogReader(version,
stateStoreCkptId)
+ currLineage = changelogReader.lineage
+ logInfo(log"Loading lineage: " +
+ log"${MDC(LogKeys.LINEAGE, lineageManager)} from " +
+ log"changelog version: ${MDC(LogKeys.VERSION_NUM, version)} " +
+ log"uniqueId: ${MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}.")
+ } finally {
+ if (changelogReader != null) {
+ changelogReader.closeIfNeeded()
+ }
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(
+ private def loadV2(
+ version: Long,
+ stateStoreCkptId: Option[String],
+ readOnly: Boolean = false): RocksDB = {
+ // An array contains lineage information from [snapShotVersion, version]
(inclusive in both ends)
+ var currVersionLineage: Array[LineageItem] =
lineageManager.getLineageForCurrVersion()
+ try {
+ if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
+ stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+ closeDB(ignoreException = false)
+
+ val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+ // Special handling when version is 0.
+ // When loading the very first version (0), stateStoreCkptId does not
need to be defined
+ // because there won't be 0.changelog / 0.zip file created in RocksDB
under v2.
+ if (version == 0) {
+ assert(stateStoreCkptId.isEmpty, "stateStoreCkptId should be empty
when version is zero")
+ (0L, None)
+ // When there is a snapshot file, it is the ground truth, we can skip
+ // reconstructing the lineage from changelog file.
+ } else if (fileManager.existsSnapshotFile(version, stateStoreCkptId)) {
+ currVersionLineage = Array(LineageItem(version,
stateStoreCkptId.get))
+ (version, stateStoreCkptId)
+ } else {
+ currVersionLineage = getLineageFromChangelogFile(version,
stateStoreCkptId) :+
+ LineageItem(version, stateStoreCkptId.get)
+ currVersionLineage = currVersionLineage.sortBy(_.version)
+
+ val latestSnapshotVersionsAndUniqueId =
+
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+ latestSnapshotVersionsAndUniqueId match {
+ case Some(pair) => (pair._1, Option(pair._2))
+ case None if currVersionLineage.head.version == 1L =>
+ logWarning(log"Cannot find latest snapshot based on lineage but
first version " +
+ log"is 1, use 0 as default. Lineage ${MDC(LogKeys.LINEAGE,
lineageManager)}")
+ (0L, None)
+ case _ =>
+ throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
+ printLineageItems(currVersionLineage))
+ }
+ }
+ }
+
+ logInfo(log"Loaded latestSnapshotVersion: ${
+ MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)},
latestSnapshotUniqueId: ${
+ MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
+
+ val metadata = fileManager.loadCheckpointFromDfs(latestSnapshotVersion,
+ workingDir, rocksDBFileMapping, latestSnapshotUniqueId)
+
+ loadedVersion = latestSnapshotVersion
Review Comment:
I think we should also set the lineage here, perhaps with single entry here.
It anyway needs to be set in the else case below.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -359,6 +473,49 @@ class RocksDB(
this
}
+ /**
+ * Initialize key metrics based on the metadata loaded from DFS and open
local RocksDB.
+ */
+ private def init(metadata: RocksDBCheckpointMetadata): Unit = {
+
+ setInitialCFInfo()
+ metadata.columnFamilyMapping.foreach { mapping =>
+ colFamilyNameToIdMap.putAll(mapping.asJava)
+ }
+
+ metadata.maxColumnFamilyId.foreach { maxId =>
+ maxColumnFamilyId.set(maxId)
+ }
+ openDB()
+ numKeysOnWritingVersion = if (!conf.trackTotalNumberOfRows) {
+ // we don't track the total number of rows - discard the number being
track
+ -1L
+ } else if (metadata.numKeys < 0) {
+ // we track the total number of rows, but the snapshot doesn't have
tracking number
+ // need to count keys now
+ countKeys()
+ } else {
+ metadata.numKeys
+ }
+ }
+
+ def load(
+ version: Long,
+ stateStoreCkptId: Option[String] = None,
+ readOnly: Boolean = false): RocksDB = {
+ assert(version >= 0)
+ acquire(LoadStore)
+ recordedMetrics = None
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+ if (stateStoreCkptId.isDefined || enableStateStoreCheckpointIds && version
== 0) {
+ loadV2(version, stateStoreCkptId, readOnly)
+ } else {
+ loadV1(version, readOnly)
Review Comment:
It's probably more descriptive names such as something like
`loadWithCheckpointId()` and `loadWithoutCheckpointId()`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala:
##########
@@ -313,6 +323,11 @@ class RocksDBFileManager(
metadata
}
+ def existsSnapshotFile(version: Long, checkpointUniqueId: Option[String] =
None): Boolean = {
+ val path = new Path(dfsRootDir)
+ fm.exists(path) && fm.exists(dfsBatchZipFile(version, checkpointUniqueId))
Review Comment:
We should check `rootDirChecked` before checking dfsRootDir existence again.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1533,3 +1717,40 @@ case class AcquiredThreadInfo(
}
}
+/**
+ * A helper class to manage the lineage information when checkpoint unique id
is enabled.
+ * "lineage" is an array of LineageItem (version, uniqueId) pair.
+ *
+ * The first item of "lineage" should normally be the version of a snapshot,
except
+ * for the first few versions. Because they are solely loaded from changelog
file.
+ * (i.e. with default minDeltasForSnapshot, there is only 1_uuid1.changelog,
no 1_uuid1.zip)
+ *
+ * The last item of "lineage" corresponds to one version before the
to-be-committed version.
Review Comment:
Why do we need to have a non-committed version in the lineage? Can you
remind me at which scenario we need to deal with an uncommitted lineage item?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]