brkyvz commented on code in PR #49275:
URL: https://github.com/apache/spark/pull/49275#discussion_r1915413736


##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -2825,6 +2825,30 @@
     },
     "sqlState" : "42K0E"
   },
+  "INVALID_LOG_VERSION" : {
+    "message" : [
+      "UnsupportedLogVersion."
+    ],
+    "subClass" : {
+      "EXACT_MATCH_VERSION" : {
+        "message" : [
+          "The only supported log version is v<matchVersion>, but encountered 
v<version>."
+        ]
+      },
+      "MAX_SUPPORTED_VERSION" : {
+        "message" : [
+          "The maximum supported log version is v<maxSupportedVersion>, but 
encountered v<version>. The log file was produced by a newer version of Spark 
and cannot be read by this version. You need to upgrade."
+        ]
+      }
+    },
+    "sqlState" : "KD002"
+  },
+  "INVALID_METADATA_VERSION" : {

Review Comment:
   `MALFORMED_LOG_FILE`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       cause = null)
   }
 
+  def invalidMetadataVersion(text: String): Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "INVALID_METADATA_VERSION",
+      messageParameters = Map("text" -> text),
+      cause = null
+    )
+  }
+
+  def invalidLogVersion(version: Int, maxSupportedVersion: Int): Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "INVALID_LOG_VERSION.MAX_SUPPORTED_VERSION",
+      messageParameters = Map(
+        "version" -> version.toString,
+        "maxSupportedVersion" -> maxSupportedVersion.toString),
+      cause = null
+    )
+  }
+
+  def invalidLogVersionExactMatch(version: Int, matchVersion: Int): Throwable 
= {

Review Comment:
   logVersionNotMatch



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala:
##########
@@ -2631,6 +2631,34 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       cause = null)
   }
 
+  def invalidMetadataVersion(text: String): Throwable = {
+    new SparkIllegalStateException(
+      errorClass = "INVALID_METADATA_VERSION",
+      messageParameters = Map("text" -> text),
+      cause = null
+    )
+  }
+
+  def invalidLogVersion(version: Int, maxSupportedVersion: Int): Throwable = {

Review Comment:
   logVersionGreaterThanSupported



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala:
##########
@@ -51,14 +51,19 @@ class CommitLog(sparkSession: SparkSession, path: String)
 
   import CommitLog._
 
+  private val VERSION: Int = sparkSession.conf.get(
+    SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt
+
   override protected[sql] def deserialize(in: InputStream): CommitMetadata = {
     // called inside a try-finally where the underlying stream is closed in 
the caller
     val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
     if (!lines.hasNext) {
       throw new IllegalStateException("Incomplete log file in the offset 
commit log")
     }
     // TODO [SPARK-49462] This validation should be relaxed for a stateless 
query.
-    validateVersion(lines.next().trim, VERSION)
+    // TODO [SPARK-50653] This validation should be relaxed to support reading
+    //  a V1 log file when VERSION is V2
+    validateVersionExactMatch(lines.next().trim, VERSION)

Review Comment:
   what's the work necessary for this? Why doesn't it just work? Can't we just 
fix it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to