brkyvz commented on code in PR #49275:
URL: https://github.com/apache/spark/pull/49275#discussion_r1915413736
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -2825,6 +2825,30 @@
},
"sqlState" : "42K0E"
},
+ "INVALID_LOG_VERSION" : {
+ "message" : [
+ "UnsupportedLogVersion."
+ ],
+ "subClass" : {
+ "EXACT_MATCH_VERSION" : {
+ "message" : [
+ "The only supported log version is v<matchVersion>, but encountered
v<version>."
+ ]
+ },
+ "MAX_SUPPORTED_VERSION" : {
+ "message" : [
+ "The maximum supported log version is v<maxSupportedVersion>, but
encountered v<version>. The log file was produced by a newer version of Spark
and cannot be read by this version. You need to upgrade."
+ ]
+ }
+ },
+ "sqlState" : "KD002"
+ },
+ "INVALID_METADATA_VERSION" : {
Review Comment:
`MALFORMED_LOG_FILE`?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = null)
}
+ def invalidMetadataVersion(text: String): Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "INVALID_METADATA_VERSION",
+ messageParameters = Map("text" -> text),
+ cause = null
+ )
+ }
+
+ def invalidLogVersion(version: Int, maxSupportedVersion: Int): Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "INVALID_LOG_VERSION.MAX_SUPPORTED_VERSION",
+ messageParameters = Map(
+ "version" -> version.toString,
+ "maxSupportedVersion" -> maxSupportedVersion.toString),
+ cause = null
+ )
+ }
+
+ def invalidLogVersionExactMatch(version: Int, matchVersion: Int): Throwable
= {
Review Comment:
logVersionNotMatch
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase with ExecutionE
cause = null)
}
+ def invalidMetadataVersion(text: String): Throwable = {
+ new SparkIllegalStateException(
+ errorClass = "INVALID_METADATA_VERSION",
+ messageParameters = Map("text" -> text),
+ cause = null
+ )
+ }
+
+ def invalidLogVersion(version: Int, maxSupportedVersion: Int): Throwable = {
Review Comment:
logVersionGreaterThanSupported
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala:
##########
@@ -51,14 +51,19 @@ class CommitLog(sparkSession: SparkSession, path: String)
import CommitLog._
+ private val VERSION: Int = sparkSession.conf.get(
+ SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt
+
override protected[sql] def deserialize(in: InputStream): CommitMetadata = {
// called inside a try-finally where the underlying stream is closed in
the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset
commit log")
}
// TODO [SPARK-49462] This validation should be relaxed for a stateless
query.
- validateVersion(lines.next().trim, VERSION)
+ // TODO [SPARK-50653] This validation should be relaxed to support reading
+ // a V1 log file when VERSION is V2
+ validateVersionExactMatch(lines.next().trim, VERSION)
Review Comment:
what's the work necessary for this? Why doesn't it just work? Can't we just
fix it here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]