anishshri-db commented on code in PR #53720:
URL: https://github.com/apache/spark/pull/53720#discussion_r2696023332
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -443,77 +443,84 @@ class RocksDB(
private def loadWithCheckpointId(
version: Long,
stateStoreCkptId: Option[String],
- readOnly: Boolean = false): RocksDB = {
+ readOnly: Boolean = false,
+ loadEmpty: Boolean = false): RocksDB = {
// An array contains lineage information from [snapShotVersion, version]
// (inclusive in both ends)
var currVersionLineage: Array[LineageItem] =
lineageManager.getLineageForCurrVersion()
try {
- if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
- stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+ if (loadEmpty || loadedVersion != version ||
loadedStateStoreCkptId.isEmpty ||
+ stateStoreCkptId.get != loadedStateStoreCkptId.get) {
closeDB(ignoreException = false)
-
- val (latestSnapshotVersion, latestSnapshotUniqueId) = {
- // Special handling when version is 0.
- // When loading the very first version (0), stateStoreCkptId does
not need to be defined
- // because there won't be 0.changelog / 0.zip file created in
RocksDB under v2.
- if (version == 0) {
- assert(stateStoreCkptId.isEmpty,
- "stateStoreCkptId should be empty when version is zero")
- (0L, None)
- // When there is a snapshot file, it is the ground truth, we can skip
- // reconstructing the lineage from changelog file.
- } else if (fileManager.existsSnapshotFile(version,
stateStoreCkptId)) {
- currVersionLineage = Array(LineageItem(version,
stateStoreCkptId.get))
- (version, stateStoreCkptId)
- } else {
- currVersionLineage = getLineageFromChangelogFile(version,
stateStoreCkptId) :+
- LineageItem(version, stateStoreCkptId.get)
- currVersionLineage = currVersionLineage.sortBy(_.version)
-
- val latestSnapshotVersionsAndUniqueId =
-
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
- latestSnapshotVersionsAndUniqueId match {
- case Some(pair) => (pair._1, Option(pair._2))
- case None if currVersionLineage.head.version == 1L =>
- logDebug(log"Cannot find latest snapshot based on lineage but
first version " +
- log"is 1, use 0 as default. Lineage: ${MDC(LogKeys.LINEAGE,
lineageManager)}")
- (0L, None)
- case _ =>
- throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
- printLineageItems(currVersionLineage))
+ if (loadEmpty) {
+ require(stateStoreCkptId.isEmpty,
+ "stateStoreCkptId should be empty when loadEmpty is true")
+ loadEmptyStore(version)
+ lineageManager.clear()
+ } else {
+ val (latestSnapshotVersion, latestSnapshotUniqueId) = {
+ // Special handling when version is 0.
+ // When loading the very first version (0), stateStoreCkptId does
not need to be defined
+ // because there won't be 0.changelog / 0.zip file created in
RocksDB under v2.
+ if (version == 0) {
+ assert(stateStoreCkptId.isEmpty,
+ "stateStoreCkptId should be empty when version is zero")
+ (0L, None)
+ // When there is a snapshot file, it is the ground truth, we can
skip
+ // reconstructing the lineage from changelog file.
+ } else if (fileManager.existsSnapshotFile(version,
stateStoreCkptId)) {
+ currVersionLineage = Array(LineageItem(version,
stateStoreCkptId.get))
+ (version, stateStoreCkptId)
+ } else {
+ currVersionLineage = getLineageFromChangelogFile(version,
stateStoreCkptId) :+
+ LineageItem(version, stateStoreCkptId.get)
+ currVersionLineage = currVersionLineage.sortBy(_.version)
+
+ val latestSnapshotVersionsAndUniqueId =
+
fileManager.getLatestSnapshotVersionAndUniqueIdFromLineage(currVersionLineage)
+ latestSnapshotVersionsAndUniqueId match {
+ case Some(pair) => (pair._1, Option(pair._2))
+ case None if currVersionLineage.head.version == 1L =>
+ logDebug(log"Cannot find latest snapshot based on lineage
but first version " +
+ log"is 1, use 0 as default. Lineage:
${MDC(LogKeys.LINEAGE, lineageManager)}")
+ (0L, None)
+ case _ =>
+ throw QueryExecutionErrors.cannotFindBaseSnapshotCheckpoint(
+ printLineageItems(currVersionLineage))
+ }
}
}
- }
- logInfo(log"Loaded latestSnapshotVersion: ${
- MDC(LogKeys.SNAPSHOT_VERSION, latestSnapshotVersion)},
latestSnapshotUniqueId: ${
- MDC(LogKeys.UUID, latestSnapshotUniqueId)}")
+ logInfo(log"Loaded latestSnapshotVersion: ${
Review Comment:
Is the indent as expected here ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -530,9 +537,13 @@ class RocksDB(
if (conf.resetStatsOnLoad) {
nativeStats.reset
}
-
- logInfo(log"Loaded ${MDC(LogKeys.VERSION_NUM, version)} " +
- log"with uniqueId ${MDC(LogKeys.UUID, stateStoreCkptId)}")
+ if (loadEmpty) {
+ logInfo(log"Loaded empty store at version ${MDC(LogKeys.VERSION_NUM,
version)} " +
+ log"with uniqueId")
Review Comment:
Is unique Id not available here ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -443,77 +443,84 @@ class RocksDB(
private def loadWithCheckpointId(
version: Long,
stateStoreCkptId: Option[String],
- readOnly: Boolean = false): RocksDB = {
+ readOnly: Boolean = false,
+ loadEmpty: Boolean = false): RocksDB = {
// An array contains lineage information from [snapShotVersion, version]
// (inclusive in both ends)
var currVersionLineage: Array[LineageItem] =
lineageManager.getLineageForCurrVersion()
try {
- if (loadedVersion != version || (loadedStateStoreCkptId.isEmpty ||
- stateStoreCkptId.get != loadedStateStoreCkptId.get)) {
+ if (loadEmpty || loadedVersion != version ||
loadedStateStoreCkptId.isEmpty ||
Review Comment:
Why do we need this ?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala:
##########
@@ -224,9 +267,8 @@ class StateRewriter(
schemaProvider,
executorSqlConf
)
-
- partitionWriter.write(partitionIter)
- }
+ Iterator(partitionWriter.write(partitionIter))
+ }.collect()
Review Comment:
Why are we calling `collect` here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]