zifeif2 commented on code in PR #53720:
URL: https://github.com/apache/spark/pull/53720#discussion_r2695731278


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala:
##########
@@ -342,6 +378,37 @@ class StateRewriter(
       None
     }
   }
+
+  private def verifyCheckpointVersion(): Unit = {
+    // Verify checkpoint version in sqlConf based on commitLog for 
readCheckpoint
+    // in case user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION.
+    // Using read batch commit since the latest commit could be a skipped 
batch.
+    // If SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION is wrong, 
readCheckpoint.commitLog
+    // will throw an exception, and we will propagate this exception upstream.
+    // This prevents the StateRewriter from failing to write the correct state 
files
+    try {
+      val writeCheckpoint =

Review Comment:
   Chatted with Micheal offline, we are okay with using lazy readCheckpoint for 
now. The goal for now is to verify `CHECKPOINT_FORMAT_VERSION ` is correct 
before writing state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to