micheal-o commented on code in PR #53313:
URL: https://github.com/apache/spark/pull/53313#discussion_r2601001678
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBCheckpointFailureInjectionSuite.scala:
##########
@@ -824,6 +828,73 @@ class RocksDBCheckpointFailureInjectionSuite extends
StreamTest
}
}
+ /**
+ * Test that verifies the fix when changelogWriter.abort() throws an
exception
+ * during rollback(), the changelogWriter is still set to None, allowing
subsequent put()
+ * calls to succeed.
+ *
+ * Before the fix, if abort() threw an exception, changelogWriter would
remain set to a
+ * writer with null streams, causing assertion failures on the next put().
Review Comment:
nit: `on the next put() during changelog replay`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala:
##########
@@ -1650,6 +1650,7 @@ class RocksDB(
* Drop uncommitted changes, and roll back to previous version.
*/
def rollback(): Unit = {
+ logInfo(log"Rolling back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}")
Review Comment:
nit: `"Rolling back uncommitted changes on version
${MDC(LogKeys.VERSION_NUM, loadedVersion)}"`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]