siying commented on code in PR #48355:
URL: https://github.com/apache/spark/pull/48355#discussion_r1870072044


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -275,7 +280,7 @@
       },
       "INVALID_CHANGE_LOG_READER_VERSION" : {
         "message" : [
-          "The change log reader version cannot be <version>."
+          "The change log reader version cannot be <version>. The checkpoint 
probably is from a future Spark version, please upgrade your Spark."

Review Comment:
   OK leave it then:p



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -965,7 +966,8 @@ class MicroBatchExecution(
       updateStateStoreCkptId(execCtx, latestExecPlan)
     }
     execCtx.reportTimeTaken("commitOffsets") {
-      if (!commitLog.add(execCtx.batchId, 
CommitMetadata(watermarkTracker.currentWatermark))) {
+      if (!commitLog.add(execCtx.batchId,
+        CommitMetadata(watermarkTracker.currentWatermark, 
currentStateStoreCkptId.toMap))) {

Review Comment:
   I don't get your comment. When `customMetrics` is empty, it means we have no 
metric reported. But here in the commit log, it will be better if we can 
distinguish there is no shuffle partition and there are shuffle partitions but 
we use V1 so there is no checkpointID. I provides another information to sanity 
check whether something goes wrong or not.
   Commit log is an on disk format, so it is hard to change. We should hold a 
higher standard for that, because we cannot easily refactored. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -278,77 +280,186 @@ class RocksDB(
   // We send snapshots that needs to be uploaded by the maintenance thread to 
this queue
   private val snapshotsToUploadQueue = new 
ConcurrentLinkedQueue[RocksDBSnapshot]()
 
+  /**
+   * Based on the ground truth lineage loaded from changelog file (lineage) and
+   * the latest snapshot (version, uniqueId) pair from file listing, this 
function finds
+   * the ground truth latest snapshot (version, uniqueId) the db instance 
needs to load.
+   *
+   * @param lineage the ground truth lineage loaded from changelog file

Review Comment:
   Is `lineage` required to be sorted by descending order of version? If it is, 
please add the comment for a requirement. I actually think may be it's a good 
idea not to make the assumption and just sort it here to guarantee we get the 
match of the largest version.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -965,7 +966,8 @@ class MicroBatchExecution(
       updateStateStoreCkptId(execCtx, latestExecPlan)
     }
     execCtx.reportTimeTaken("commitOffsets") {
-      if (!commitLog.add(execCtx.batchId, 
CommitMetadata(watermarkTracker.currentWatermark))) {
+      if (!commitLog.add(execCtx.batchId,
+        CommitMetadata(watermarkTracker.currentWatermark, 
currentStateStoreCkptId.toMap))) {

Review Comment:
   @WweiL the added the test is great. This is not what I meant in the comment 
though. I didn't mean downgrading to V1 in the same release, but downgrade to 
an older release where the code of `CommitMetadata` doesn't have the extra 
column added here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to