heyihong commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1334246788


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -93,33 +184,63 @@ private[client] object GrpcExceptionConverter extends 
JsonUtils {
       new SparkArrayIndexOutOfBoundsException(message)),
     errorConstructor[DateTimeException]((message, _) => new 
SparkDateTimeException(message)),
     errorConstructor((message, cause) => new SparkRuntimeException(message, 
cause)),
-    errorConstructor((message, cause) => new SparkUpgradeException(message, 
cause)))
-
-  private def errorInfoToThrowable(info: ErrorInfo, message: String): 
Option[Throwable] = {
-    val classes =
-      mapper.readValue(info.getMetadataOrDefault("classes", "[]"), 
classOf[Array[String]])
+    errorConstructor((message, cause) => new SparkUpgradeException(message, 
cause)),
+    errorConstructor((message, cause) => new SparkException(message, 
cause.orNull)))
+
+  /**
+   * errorsToThrowable reconstructs the exception based on a list of protobuf 
messages
+   * FetchErrorDetailsResponse.Error with un-truncated error messages and 
server-side stacktrace
+   * (if set).
+   */
+  private def errorsToThrowable(
+      errorIdx: Int,
+      errors: Seq[FetchErrorDetailsResponse.Error]): Throwable = {
+
+    val error = errors(errorIdx)
+
+    val classHierarchy = error.getErrorTypeHierarchyList.asScala
+
+    val constructor =
+      classHierarchy
+        .flatMap(errorFactory.get)
+        .headOption
+        .getOrElse((message: String, cause: Option[Throwable]) =>
+          new SparkException(s"${classHierarchy.head}: ${message}", 
cause.orNull))
+
+    val causeOpt =
+      if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, 
errors)) else None
+
+    val exception = constructor(error.getMessage, causeOpt)
+
+    if (!error.getStackTraceList.isEmpty) {
+      exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { 
stackTraceElement =>
+        new StackTraceElement(
+          stackTraceElement.getDeclaringClass,
+          stackTraceElement.getMethodName,
+          stackTraceElement.getFileName,
+          stackTraceElement.getLineNumber)
+      })
+    }
 
-    classes
-      .find(errorFactory.contains)
-      .map { cls =>
-        val constructor = errorFactory.get(cls).get
-        constructor(message, None)
-      }
+    exception
   }
 
-  private def toThrowable(ex: StatusRuntimeException): Throwable = {
-    val status = StatusProto.fromThrowable(ex)
-
-    val fallbackEx = new SparkException(ex.toString, ex.getCause)
-
-    val errorInfoOpt = status.getDetailsList.asScala
-      .find(_.is(classOf[ErrorInfo]))
-
-    if (errorInfoOpt.isEmpty) {
-      return fallbackEx
-    }
-
-    errorInfoToThrowable(errorInfoOpt.get.unpack(classOf[ErrorInfo]), 
status.getMessage)
-      .getOrElse(fallbackEx)
+  /**
+   * errorInfoToThrowable reconstructs the exception based on the error 
classes hierarchy and the
+   * truncated error message.
+   */
+  private def errorInfoToThrowable(info: ErrorInfo, message: String): 
Throwable = {
+    implicit val formats = DefaultFormats
+    val classes =
+      JsonMethods.parse(info.getMetadataOrDefault("classes", 
"[]")).extract[Array[String]]

Review Comment:
   When the header size exceeds limit, the server will return errors in 
responses instead of truncating data silently. If the json is malformed, it may 
be better to fail early?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to