zifeif2 commented on code in PR #53720:
URL: https://github.com/apache/spark/pull/53720#discussion_r2695507000
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala:
##########
@@ -342,6 +378,37 @@ class StateRewriter(
None
}
}
+
+ private def verifyCheckpointVersion(): Unit = {
+ // Verify checkpoint version in sqlConf based on commitLog for
readCheckpoint
+ // in case user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION.
+ // Using read batch commit since the latest commit could be a skipped
batch.
+ // If SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION is wrong,
readCheckpoint.commitLog
+ // will throw an exception, and we will propagate this exception upstream.
+ // This prevents the StateRewriter from failing to write the correct state
files
+ try {
+ val writeCheckpoint =
Review Comment:
Sorry I missed the first point. Will address it in the next version.
For the second point, I am thinking if we should verify
`CHECKPOINT_FORMAT_VERSION` as early as possible, in case future engineers use
a lazily initialized `writeCheckpoint` and `readCheckpoint` in other functions
without calling `verifyCheckpointFormatVersion`. In other words, we need to
initialize `readCheckpoint` when constructing StateRewriter and call
`verifyCheckpointFormatVersion` right after readCheckpoint is initialized, and
we don't need to make `writeCheckpoint` and `readCheckpoint` lazy variables .
What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]