WweiL commented on code in PR #49275:
URL: https://github.com/apache/spark/pull/49275#discussion_r1915520560
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala:
##########
@@ -51,14 +51,19 @@ class CommitLog(sparkSession: SparkSession, path: String)
import CommitLog._
+ private val VERSION: Int = sparkSession.conf.get(
+ SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt
+
override protected[sql] def deserialize(in: InputStream): CommitMetadata = {
// called inside a try-finally where the underlying stream is closed in
the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset
commit log")
}
// TODO [SPARK-49462] This validation should be relaxed for a stateless
query.
- validateVersion(lines.next().trim, VERSION)
+ // TODO [SPARK-50653] This validation should be relaxed to support reading
+ // a V1 log file when VERSION is V2
+ validateVersionExactMatch(lines.next().trim, VERSION)
Review Comment:
follow up jira filed here https://issues.apache.org/jira/browse/SPARK-50653
Originally I tried to make it just work, but after the refactor requested,
it doesn't work anymore.
Making it work would require some logic change in RocksDB file now, @siying
mentinoed that there should be a one-time snapshot for the v1 -> v2 case. So I
filed a ticket and mark this as todo for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]