zifeif2 commented on code in PR #53720:
URL: https://github.com/apache/spark/pull/53720#discussion_r2691650963
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala:
##########
@@ -342,6 +381,25 @@ class StateRewriter(
None
}
}
+
+ private def setCorrectCheckpointVersion(): Unit = {
+ // Setting checkpoint version in sqlConf based on previous commitLog in
case user forgot to
+ // set STATE_STORE_CHECKPOINT_FORMAT_VERSION and crash rewriter.
+ // Using read batch commit since the latest commit could be a skipped batch
+ // If SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION is wrong,
+ // readCheckpoint.commitLog will throw an exception.
+ // This prevents the StateRewriter from failing to write the correct state
files
+ try {
+ readCheckpoint.commitLog.get(readBatchId)
+ } catch {
+ case e: IllegalStateException =>
+ val sparkThrowable = e.getCause.asInstanceOf[SparkThrowable]
+ if (sparkThrowable.getCondition ==
"INVALID_LOG_VERSION.EXACT_MATCH_VERSION") {
+
sparkSession.conf.set(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key,
+ sparkThrowable.getMessageParameters.get("version"))
Review Comment:
Didn't really like how this is implemented, but this is the only way I can
think of. `readCheckpoint` is created with the sparkSession with the wrong
`STATE_STORE_CHECKPOINT_FORMAT_VERSION` set (see line 85), so when we access
commitLog, it will throw an error `INVALID_LOG_VERSION.EXACT_MATCH_VERSION` so
we can't check if there is checkpointId in commitLog, but instead of just
relying on the error message to set the correct checkpoint version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]