This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6fcc2685cd3 [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo` 6fcc2685cd3 is described below commit 6fcc2685cd3f9681dc85e0d53047f2e647d24b0b Author: Max Gekk <max.g...@gmail.com> AuthorDate: Thu Dec 28 11:14:03 2023 +0300 [SPARK-46532][CONNECT] Pass message parameters in metadata of `ErrorInfo` ### What changes were proposed in this pull request? In the PR, I propose to put message parameters together with an error class in the `messageParameter` field in metadata of `ErrorInfo`. ### Why are the changes needed? To be able to create an error from an error class and message parameters. Before the changes, it is not possible to re-construct an error having only an error class. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the modified test: ``` $ build/sbt "connect-client-jvm/testOnly *ClientE2ETestSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44468 from MaxGekk/messageParameters-in-metadata. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 8 +++++++- .../spark/sql/connect/client/GrpcExceptionConverter.scala | 4 ++++ .../org/apache/spark/sql/connect/config/Connect.scala | 9 +++++++++ .../org/apache/spark/sql/connect/utils/ErrorUtils.scala | 14 +++++++++++--- python/pyspark/sql/tests/connect/test_connect_basic.py | 2 +- 5 files changed, 32 insertions(+), 5 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index c947d948b4c..0740334724e 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -85,7 +85,13 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM |""".stripMargin) .collect() } - assert(ex.getErrorClass != null) + assert( + ex.getErrorClass === + "INCONSISTENT_BEHAVIOR_CROSS_VERSION.PARSE_DATETIME_BY_NEW_PARSER") + assert( + ex.getMessageParameters.asScala == Map( + "datetime" -> "'02-29'", + "config" -> "\"spark.sql.legacy.timeParserPolicy\"")) if (enrichErrorEnabled) { assert(ex.getCause.isInstanceOf[DateTimeException]) } else { diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index 075526e7521..cc47924de3b 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -372,10 +372,14 @@ private[client] object GrpcExceptionConverter { .addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava) if (errorClass != null) { + val messageParameters = JsonMethods + .parse(info.getMetadataOrDefault("messageParameters", "{}")) + .extract[Map[String, String]] builder.setSparkThrowable( FetchErrorDetailsResponse.SparkThrowable .newBuilder() .setErrorClass(errorClass) + .putAllMessageParameters(messageParameters.asJava) .build()) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala index ab4f06d508a..39bf1a630af 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala @@ -256,4 +256,13 @@ object Connect { .version("4.0.0") .booleanConf .createWithDefault(true) + + val CONNECT_GRPC_MAX_METADATA_SIZE = + buildStaticConf("spark.connect.grpc.maxMetadataSize") + .doc( + "Sets the maximum size of metadata fields. For instance, it restricts metadata fields " + + "in `ErrorInfo`.") + .version("4.0.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1024) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 703b11c0c73..f489551a1db 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -172,6 +172,7 @@ private[connect] object ErrorUtils extends Logging { "classes", JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName)))) + val maxMetadataSize = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_MAX_METADATA_SIZE) // Add the SQL State and Error Class to the response metadata of the ErrorInfoObject. st match { case e: SparkThrowable => @@ -181,7 +182,12 @@ private[connect] object ErrorUtils extends Logging { } val errorClass = e.getErrorClass if (errorClass != null && errorClass.nonEmpty) { - errorInfo.putMetadata("errorClass", errorClass) + val messageParameters = JsonMethods.compact( + JsonMethods.render(map2jvalue(e.getMessageParameters.asScala.toMap))) + if (messageParameters.length <= maxMetadataSize) { + errorInfo.putMetadata("errorClass", errorClass) + errorInfo.putMetadata("messageParameters", messageParameters) + } } case _ => } @@ -200,8 +206,10 @@ private[connect] object ErrorUtils extends Logging { val withStackTrace = if (sessionHolderOpt.exists( _.session.conf.get(SQLConf.PYSPARK_JVM_STACKTRACE_ENABLED) && stackTrace.nonEmpty)) { - val maxSize = SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE) - errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize)) + val maxSize = Math.min( + SparkEnv.get.conf.get(Connect.CONNECT_JVM_STACK_TRACE_MAX_SIZE), + maxMetadataSize) + errorInfo.putMetadata("stackTrace", StringUtils.abbreviate(stackTrace.get, maxSize.toInt)) } else { errorInfo } diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py b/python/pyspark/sql/tests/connect/test_connect_basic.py index 32cd4ed6249..7275f40b39a 100755 --- a/python/pyspark/sql/tests/connect/test_connect_basic.py +++ b/python/pyspark/sql/tests/connect/test_connect_basic.py @@ -3452,7 +3452,7 @@ class SparkConnectSessionTests(ReusedConnectTestCase): self.spark.stop() spark = ( PySparkSession.builder.config(conf=self.conf()) - .config("spark.connect.jvmStacktrace.maxSize", 128) + .config("spark.connect.grpc.maxMetadataSize", 128) .remote("local[4]") .getOrCreate() ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org