brkyvz commented on code in PR #49275:
URL: https://github.com/apache/spark/pull/49275#discussion_r1906538392
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
object MetadataVersionUtil {
/**
* Parse the log version from the given `text` -- will throw exception when
the parsed version
- * exceeds `maxSupportedVersion`, or when `text` is malformed (such as
"xyz", "v", "v-1",
- * "v123xyz" etc.)
+ * exceeds `maxSupportedVersion`, or when `text` is malformed.
*/
def validateVersion(text: String, maxSupportedVersion: Int): Int = {
- if (text.length > 0 && text(0) == 'v') {
- val version =
- try {
- text.substring(1, text.length).toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalStateException(s"Log file was malformed: failed
to read correct log " +
- s"version from $text.")
- }
- if (version > 0) {
- if (version > maxSupportedVersion) {
- throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
- s"is v${maxSupportedVersion}, but encountered v$version. The log
file was produced " +
- s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
- } else {
- return version
- }
- }
+ val version: Int = extractVersion(text)
+ if (version > maxSupportedVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
+ s"is v${maxSupportedVersion}, but encountered v$version. The log file
was produced " +
+ s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
+ }
+ version
+ }
+
+ /**
+ * Parse the log version from the given `text` -- will throw exception when
the parsed version
+ * does not equal to `matchVersion`, or when `text` is malformed.
+ */
+ def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+ val version: Int = extractVersion(text)
+ if (version != matchVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: the only
supported log version " +
+ s"is v${matchVersion}, but encountered v$version.")
Review Comment:
please use the new error message framework
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala:
##########
@@ -51,14 +51,18 @@ class CommitLog(sparkSession: SparkSession, path: String)
import CommitLog._
+ private val VERSION = SQLConf.get.stateStoreCheckpointFormatVersion
Review Comment:
I think moving it here is correct since the conf can be changed per session.
Can we use `sparkSession.conf` instead though, not SQLConf.get?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
object MetadataVersionUtil {
/**
* Parse the log version from the given `text` -- will throw exception when
the parsed version
- * exceeds `maxSupportedVersion`, or when `text` is malformed (such as
"xyz", "v", "v-1",
- * "v123xyz" etc.)
+ * exceeds `maxSupportedVersion`, or when `text` is malformed.
*/
def validateVersion(text: String, maxSupportedVersion: Int): Int = {
- if (text.length > 0 && text(0) == 'v') {
- val version =
- try {
- text.substring(1, text.length).toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalStateException(s"Log file was malformed: failed
to read correct log " +
- s"version from $text.")
- }
- if (version > 0) {
- if (version > maxSupportedVersion) {
- throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
- s"is v${maxSupportedVersion}, but encountered v$version. The log
file was produced " +
- s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
- } else {
- return version
- }
- }
+ val version: Int = extractVersion(text)
+ if (version > maxSupportedVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
+ s"is v${maxSupportedVersion}, but encountered v$version. The log file
was produced " +
+ s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
+ }
+ version
+ }
+
+ /**
+ * Parse the log version from the given `text` -- will throw exception when
the parsed version
+ * does not equal to `matchVersion`, or when `text` is malformed.
+ */
+ def validateVersionExactMatch(text: String, matchVersion: Int): Int = {
+ val version: Int = extractVersion(text)
+ if (version != matchVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: the only
supported log version " +
+ s"is v${matchVersion}, but encountered v$version.")
}
+ version
+ }
- // reaching here means we failed to read the correct log version
- throw new IllegalStateException(s"Log file was malformed: failed to read
correct log " +
- s"version from $text.")
+ /**
+ * Parse the log version from the given `text` -- will throw exception when
the parsed version
+ * when `text` is malformed (such as "xyz", "v", "v-1", "v123xyz" etc.)
+ */
+ private def extractVersion(text: String): Int = {
+ val version: Int = if (text.nonEmpty && text(0) == 'v') {
+ try {
+ text.substring(1, text.length).toInt
+ } catch {
+ case _: NumberFormatException =>
+ throw new IllegalStateException(s"Log file was malformed: failed to
read correct log " +
+ s"version from $text.")
+ }
+ } else {
+ throw new IllegalStateException(s"Log file was malformed: failed to read
correct log " +
+ s"version from $text.")
+ }
+ if (version <= 0) {
+ throw new IllegalStateException(s"Log file was malformed: failed to read
correct log " +
+ s"version from $text.")
+ }
Review Comment:
Can you please use the new error framework for these errors?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataVersionUtil.scala:
##########
@@ -20,32 +20,52 @@ package org.apache.spark.sql.execution.streaming
object MetadataVersionUtil {
/**
* Parse the log version from the given `text` -- will throw exception when
the parsed version
- * exceeds `maxSupportedVersion`, or when `text` is malformed (such as
"xyz", "v", "v-1",
- * "v123xyz" etc.)
+ * exceeds `maxSupportedVersion`, or when `text` is malformed.
*/
def validateVersion(text: String, maxSupportedVersion: Int): Int = {
- if (text.length > 0 && text(0) == 'v') {
- val version =
- try {
- text.substring(1, text.length).toInt
- } catch {
- case _: NumberFormatException =>
- throw new IllegalStateException(s"Log file was malformed: failed
to read correct log " +
- s"version from $text.")
- }
- if (version > 0) {
- if (version > maxSupportedVersion) {
- throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
- s"is v${maxSupportedVersion}, but encountered v$version. The log
file was produced " +
- s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
- } else {
- return version
- }
- }
+ val version: Int = extractVersion(text)
+ if (version > maxSupportedVersion) {
+ throw new IllegalStateException(s"UnsupportedLogVersion: maximum
supported log version " +
+ s"is v${maxSupportedVersion}, but encountered v$version. The log file
was produced " +
+ s"by a newer version of Spark and cannot be read by this version.
Please upgrade.")
Review Comment:
please use the new error message framework for this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]