brkyvz commented on code in PR #49275:
URL: https://github.com/apache/spark/pull/49275#discussion_r1906538392


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
 object MetadataVersionUtil {
   /**
    * Parse the log version from the given `text` -- will throw exception when 
the parsed version
-   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
-   * "v123xyz" etc.)
+   * exceeds `maxSupportedVersion`, or when `text` is malformed.
    */
   def validateVersion(text: String, maxSupportedVersion: Int): Int = {
-    if (text.length > 0 && text(0) == 'v') {
-      val version =
-        try {
-          text.substring(1, text.length).toInt
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Log file was malformed: failed 
to read correct log " +
-              s"version from $text.")
-        }
-      if (version > 0) {
-        if (version > maxSupportedVersion) {
-          throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
-            s"is v${maxSupportedVersion}, but encountered v$version. The log 
file was produced " +
-            s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
-        } else {
-          return version
-        }
-      }
+    val version: Int = extractVersion(text)
+    if (version > maxSupportedVersion) {
+      throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
+        s"is v${maxSupportedVersion}, but encountered v$version. The log file 
was produced " +
+        s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
+    }
+    version
+  }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception when 
the parsed version
+   * does not equal to `matchVersion`, or when `text` is malformed.
+   */
+  def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+    val version: Int = extractVersion(text)
+    if (version != matchVersion) {
+      throw new IllegalStateException(s"UnsupportedLogVersion: the only 
supported log version " +
+        s"is v${matchVersion}, but encountered v$version.")

Review Comment:
   please use the new error message framework



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala:
##########
@@ -51,14 +51,18 @@ class CommitLog(sparkSession: SparkSession, path: String)
 
   import CommitLog._
 
+  private val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion

Review Comment:
   I think moving it here is correct since the conf can be changed per session. 
Can we use `sparkSession.conf` instead though, not SQLConf.get?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
 object MetadataVersionUtil {
   /**
    * Parse the log version from the given `text` -- will throw exception when 
the parsed version
-   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
-   * "v123xyz" etc.)
+   * exceeds `maxSupportedVersion`, or when `text` is malformed.
    */
   def validateVersion(text: String, maxSupportedVersion: Int): Int = {
-    if (text.length > 0 && text(0) == 'v') {
-      val version =
-        try {
-          text.substring(1, text.length).toInt
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Log file was malformed: failed 
to read correct log " +
-              s"version from $text.")
-        }
-      if (version > 0) {
-        if (version > maxSupportedVersion) {
-          throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
-            s"is v${maxSupportedVersion}, but encountered v$version. The log 
file was produced " +
-            s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
-        } else {
-          return version
-        }
-      }
+    val version: Int = extractVersion(text)
+    if (version > maxSupportedVersion) {
+      throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
+        s"is v${maxSupportedVersion}, but encountered v$version. The log file 
was produced " +
+        s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
+    }
+    version
+  }
+
+  /**
+   * Parse the log version from the given `text` -- will throw exception when 
the parsed version
+   * does not equal to `matchVersion`, or when `text` is malformed.
+   */
+  def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+    val version: Int = extractVersion(text)
+    if (version != matchVersion) {
+      throw new IllegalStateException(s"UnsupportedLogVersion: the only 
supported log version " +
+        s"is v${matchVersion}, but encountered v$version.")
     }
+    version
+  }
 
-    // reaching here means we failed to read the correct log version
-    throw new IllegalStateException(s"Log file was malformed: failed to read 
correct log " +
-      s"version from $text.")
+  /**
+   * Parse the log version from the given `text` -- will throw exception when 
the parsed version
+   * when `text` is malformed (such as "xyz", "v", "v-1", "v123xyz" etc.)
+   */
+  private def extractVersion(text: String): Int = {
+    val version: Int = if (text.nonEmpty && text(0) == 'v') {
+      try {
+        text.substring(1, text.length).toInt
+      } catch {
+        case _: NumberFormatException =>
+          throw new IllegalStateException(s"Log file was malformed: failed to 
read correct log " +
+            s"version from $text.")
+      }
+    } else {
+      throw new IllegalStateException(s"Log file was malformed: failed to read 
correct log " +
+        s"version from $text.")
+    }
+    if (version <= 0) {
+      throw new IllegalStateException(s"Log file was malformed: failed to read 
correct log " +
+        s"version from $text.")
+    }

Review Comment:
   Can you please use the new error framework for these errors?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
 object MetadataVersionUtil {
   /**
    * Parse the log version from the given `text` -- will throw exception when 
the parsed version
-   * exceeds `maxSupportedVersion`, or when `text` is malformed (such as 
"xyz", "v", "v-1",
-   * "v123xyz" etc.)
+   * exceeds `maxSupportedVersion`, or when `text` is malformed.
    */
   def validateVersion(text: String, maxSupportedVersion: Int): Int = {
-    if (text.length > 0 && text(0) == 'v') {
-      val version =
-        try {
-          text.substring(1, text.length).toInt
-        } catch {
-          case _: NumberFormatException =>
-            throw new IllegalStateException(s"Log file was malformed: failed 
to read correct log " +
-              s"version from $text.")
-        }
-      if (version > 0) {
-        if (version > maxSupportedVersion) {
-          throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
-            s"is v${maxSupportedVersion}, but encountered v$version. The log 
file was produced " +
-            s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")
-        } else {
-          return version
-        }
-      }
+    val version: Int = extractVersion(text)
+    if (version > maxSupportedVersion) {
+      throw new IllegalStateException(s"UnsupportedLogVersion: maximum 
supported log version " +
+        s"is v${maxSupportedVersion}, but encountered v$version. The log file 
was produced " +
+        s"by a newer version of Spark and cannot be read by this version. 
Please upgrade.")

Review Comment:
   please use the new error message framework for this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to