siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1837418169
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,6 +285,82 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+
+ /**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
Review Comment:
I think the indenting rule is to start from the next line if it cannot fit
two lines?
Also, divide ${MDC(...)} into two lines feel hard to read. Can you try to
keep it in the single line. If you can't ore don't want to, it should follow
the indenting convention and have every parameter to occupy a whole line.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,6 +285,82 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+
+ /**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
+ currLineage = Some(versionToUniqueIdLineage)
+ } catch {
+ // This can happen when you first load with changelog enabled and then
disable it,
+ // or the state store was previously ran under format version 1.
+ case e: SparkException
Review Comment:
This exception is too broad to indicate there is no such entry. Can you get
changelogReader.lineage to return None if there is no lineage record?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -290,15 +373,43 @@ class RocksDB(
assert(version >= 0)
acquire(LoadStore)
recordedMetrics = None
- logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
Review Comment:
I think it is confusing to put "${" in a separate line.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -249,24 +273,130 @@ class RocksDB(
}
}
+/**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
+ currLineage = Some(versionToUniqueIdLineage)
+ } catch {
+ // This can happen when you first load with changelog enabled and then
disable it,
+ // or the state store was previously ran under format version 1.
+ case e: SparkException
+ if e.getErrorClass ==
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE" =>
+ // do nothing
+ case e: Throwable =>
+ throw e
+ } finally {
+ if (changelogReader != null) changelogReader.closeIfNeeded()
+ if (currLineage.isEmpty) currLineage = Some(Array((version,
stateStoreCkptId)))
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(version: Long, readOnly: Boolean = false): RocksDB = {
+ def load(
+ version: Long,
+ stateStoreCkptId: Option[String] = None,
+ readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire(LoadStore)
recordedMetrics = None
- logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+ var currLineage: Option[Array[(Long, Option[String])]] = None
try {
- if (loadedVersion != version) {
+ if (loadedVersion != version ||
+ (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined &&
+ (loadedStateStoreCkptId.isEmpty || stateStoreCkptId.get !=
loadedStateStoreCkptId.get))) {
closeDB(ignoreException = false)
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
- val latestSnapshotVersion =
fileManager.getLatestSnapshotVersion(version)
- val metadata =
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+
+ val latestSnapshotVersionsAndUniqueIds =
+ fileManager.getLatestSnapshotVersionAndUniqueId(version,
stateStoreCkptId)
Review Comment:
Can't we first read changelog for lineage, and then do this? The
back-and-force is confusing.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,6 +285,82 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+
+ /**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
Review Comment:
Not sure whether the comment is wrong or the implementation is wrong, but no
matter whether we find multiple latest snapshot files, we need to use the
correct one. If it doesn't exist, we should not use it.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,6 +285,82 @@ class RocksDB(
// We send snapshots that needs to be uploaded by the maintenance thread to
this queue
private val snapshotsToUploadQueue = new
ConcurrentLinkedQueue[RocksDBSnapshot]()
+
+ /**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
+ currLineage = Some(versionToUniqueIdLineage)
+ } catch {
+ // This can happen when you first load with changelog enabled and then
disable it,
+ // or the state store was previously ran under format version 1.
+ case e: SparkException
+ if e.getCondition ==
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE" =>
+ // do nothing
+ case e: Throwable =>
+ throw e
Review Comment:
If we delete line 350-351, would it behave the same?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -249,24 +273,130 @@ class RocksDB(
}
}
+/**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
+ currLineage = Some(versionToUniqueIdLineage)
+ } catch {
+ // This can happen when you first load with changelog enabled and then
disable it,
+ // or the state store was previously ran under format version 1.
+ case e: SparkException
+ if e.getErrorClass ==
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE" =>
+ // do nothing
+ case e: Throwable =>
+ throw e
+ } finally {
+ if (changelogReader != null) changelogReader.closeIfNeeded()
+ if (currLineage.isEmpty) currLineage = Some(Array((version,
stateStoreCkptId)))
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(version: Long, readOnly: Boolean = false): RocksDB = {
+ def load(
+ version: Long,
+ stateStoreCkptId: Option[String] = None,
+ readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire(LoadStore)
recordedMetrics = None
- logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+ var currLineage: Option[Array[(Long, Option[String])]] = None
try {
- if (loadedVersion != version) {
+ if (loadedVersion != version ||
+ (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined &&
+ (loadedStateStoreCkptId.isEmpty || stateStoreCkptId.get !=
loadedStateStoreCkptId.get))) {
closeDB(ignoreException = false)
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
- val latestSnapshotVersion =
fileManager.getLatestSnapshotVersion(version)
- val metadata =
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+
+ val latestSnapshotVersionsAndUniqueIds =
+ fileManager.getLatestSnapshotVersionAndUniqueId(version,
stateStoreCkptId)
+
+ // Update the lineage from changelog
+ // When loading from the first version (query restart, not necessarily
version = 0,
+ // stateStoreCkptId is not defined even if
enableStateStoreCheckpointIds is true
+ if (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined) {
+ currLineage = getLineageFromChangelogFile(version,
useColumnFamilies, stateStoreCkptId)
Review Comment:
We already have existing lineage cached, right? Why not just reuse it. It's
not like a very complicated thing to do. Why wait and fix later?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -249,24 +273,130 @@ class RocksDB(
}
}
+/**
+ * In the case of checkpointFormatVersion 2, if we find multiple latest
snapshot files of
+ * the same version but they have different uniqueIds, we need to find the
correct one based on
+ * the lineage we have.
+ *
+ * When this happens, the stored file on the DFS must be a changelog file,
because if not, the
+ * latest snapshot is uniquely identified as the (version, stateStoreCkptId)
pair.
+ *
+ * This method achieves traversing back the lineage and find the correct
latest snapshot file
+ * by creating a changelog reader and compare with the lineages stored there.
+ */
+ private def getLatestSnapshotVersionAndUniqueIdFromLineage(
+ currLineage: Array[(Long, Option[String])],
+ latestSnapshotVersionsAndUniqueIds: Array[(Long, Option[String])]):
+ (Long, Option[String]) = {
+ currLineage.foreach {
+ case (version, uniqueId) =>
+ if (latestSnapshotVersionsAndUniqueIds.contains((version, uniqueId))) {
+ return (version, uniqueId)
+ }
+ }
+ throw
QueryExecutionErrors.cannotGetLatestSnapshotVersionAndUniqueIdFromLineage(
+ printLineage(currLineage),
printLineage(latestSnapshotVersionsAndUniqueIds)
+ )
+ }
+
+ private def getLineageFromChangelogFile(
+ version: Long,
+ useColumnFamilies: Boolean,
+ stateStoreCkptId: Option[String]): Option[Array[(Long, Option[String])]]
= {
+
+ // It is possible that change log checkpointing is first enabled and then
disabled.
+ // In this case, loading changelog reader will fail because there are only
zip files.
+ // It is also possible that state store was previously ran under format
version 1
+ // In that case, loading changelog reader with file format
version_uniqueId.changelog
+ // will also fail.
+ // But either way, there is no lineage in either case so we can swallow
the failure
+ // CANNOT_READ_STREAMING_STATE_FILE.
+ var changelogReader: StateStoreChangelogReader = null
+ var currLineage: Option[Array[(Long, Option[String])]] = None
+ try {
+ changelogReader = fileManager.getChangelogReader(
+ version, useColumnFamilies, stateStoreCkptId)
+ // currLineage contains the version -> uniqueId mapping from the
previous snapshot file
+ // to current version's changelog file
+ versionToUniqueIdLineage = changelogReader.lineage.map {
+ case (version, uniqueId) => (version, Option(uniqueId))
+ }
+ logInfo(log"Loading versionToUniqueIdLineage: ${MDC(LogKeys.LINEAGE,
+ printLineage(versionToUniqueIdLineage))} from changelog version: ${MDC(
+ LogKeys.VERSION_NUM, version)} uniqueId: ${MDC(LogKeys.UUID,
+ stateStoreCkptId.getOrElse(""))}. " +
+ log"This would be an noop if changelog is not enabled, or the query
was previously" +
+ log"ran under checkpoint format v1")
+ currLineage = Some(versionToUniqueIdLineage)
+ } catch {
+ // This can happen when you first load with changelog enabled and then
disable it,
+ // or the state store was previously ran under format version 1.
+ case e: SparkException
+ if e.getErrorClass ==
"CANNOT_LOAD_STATE_STORE.CANNOT_READ_STREAMING_STATE_FILE" =>
+ // do nothing
+ case e: Throwable =>
+ throw e
+ } finally {
+ if (changelogReader != null) changelogReader.closeIfNeeded()
+ if (currLineage.isEmpty) currLineage = Some(Array((version,
stateStoreCkptId)))
+ }
+ currLineage
+ }
+
+
/**
* Load the given version of data in a native RocksDB instance.
* Note that this will copy all the necessary file from DFS to local disk as
needed,
* and possibly restart the native RocksDB instance.
*/
- def load(version: Long, readOnly: Boolean = false): RocksDB = {
+ def load(
+ version: Long,
+ stateStoreCkptId: Option[String] = None,
+ readOnly: Boolean = false): RocksDB = {
assert(version >= 0)
acquire(LoadStore)
recordedMetrics = None
- logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)}")
+ logInfo(log"Loading ${MDC(LogKeys.VERSION_NUM, version)} with
stateStoreCkptId: ${
+ MDC(LogKeys.UUID, stateStoreCkptId.getOrElse(""))}")
+ var currLineage: Option[Array[(Long, Option[String])]] = None
try {
- if (loadedVersion != version) {
+ if (loadedVersion != version ||
+ (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined &&
+ (loadedStateStoreCkptId.isEmpty || stateStoreCkptId.get !=
loadedStateStoreCkptId.get))) {
closeDB(ignoreException = false)
// deep copy is needed to avoid race condition
// between maintenance and task threads
fileManager.copyFileMapping()
- val latestSnapshotVersion =
fileManager.getLatestSnapshotVersion(version)
- val metadata =
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
+
+ val latestSnapshotVersionsAndUniqueIds =
+ fileManager.getLatestSnapshotVersionAndUniqueId(version,
stateStoreCkptId)
+
+ // Update the lineage from changelog
+ // When loading from the first version (query restart, not necessarily
version = 0,
+ // stateStoreCkptId is not defined even if
enableStateStoreCheckpointIds is true
+ if (enableStateStoreCheckpointIds && stateStoreCkptId.isDefined) {
+ currLineage = getLineageFromChangelogFile(version,
useColumnFamilies, stateStoreCkptId)
+ }
+
+ val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+ // When loading from version 0
+ if (latestSnapshotVersionsAndUniqueIds.length == 0) {
+ (0L, None)
+ } else if (latestSnapshotVersionsAndUniqueIds.length == 1) {
Review Comment:
commit log should always be the source of truth. Even if there is only one,
it can be a wrong one. It is true for both changelog and snapshot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]