hvanhovell commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1322665674
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,46 +24,174 @@ import scala.reflect.ClassTag
import com.google.rpc.ErrorInfo
import io.grpc.StatusRuntimeException
import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkArithmeticException,
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException,
SparkIllegalArgumentException, SparkNumberFormatException,
SparkRuntimeException, SparkUnsupportedOperationException,
SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, UserContext}
+import org.apache.spark.connect.proto.FetchErrorDetailsResponse.ExceptionInfo
+import
org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.sql.AnalysisException
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException,
TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
-private[client] object GrpcExceptionConverter extends JsonUtils {
- def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions
into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and
making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and
optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be
constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on
the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the
StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub:
SparkConnectServiceBlockingStub) {
+ import GrpcExceptionConverter._
+
+ def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
try {
f
} catch {
case e: StatusRuntimeException =>
- throw toThrowable(e)
+ throw toThrowable(e, sessionId, userContext)
}
}
- def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+ def convertIterator[T](
+ sessionId: String,
+ userContext: UserContext,
+ iter: CloseableIterator[T]): CloseableIterator[T] = {
new CloseableIterator[T] {
override def hasNext: Boolean = {
- convert {
+ convert(sessionId, userContext) {
iter.hasNext
}
}
override def next(): T = {
- convert {
+ convert(sessionId, userContext) {
iter.next()
}
}
override def close(): Unit = {
- convert {
+ convert(sessionId, userContext) {
iter.close()
}
}
}
}
+ private def exceptionInfoToError(exceptionInfo: ExceptionInfo):
Option[Error] = {
+ val stackTraceOpt =
+ if (exceptionInfo.getStackTraceList.isEmpty) None
+ else {
+ Some(exceptionInfo.getStackTraceList.asScala.toArray.map {
stackTraceElement =>
+ new StackTraceElement(
+ stackTraceElement.getDeclaringClass,
+ stackTraceElement.getMethodName,
+ stackTraceElement.getFileName,
+ stackTraceElement.getLineNumber)
+ })
+ }
+ Some(
+ Error(
+ classHierarchy =
exceptionInfo.getErrorTypeHierarchyList.asScala.toArray,
+ message = exceptionInfo.getMessage,
+ stackTraceOpt = stackTraceOpt,
+ causeOpt = if (exceptionInfo.hasCause) {
+ exceptionInfoToError(exceptionInfo.getCause)
+ } else {
+ None
+ }))
+ }
+
+ /**
+ * enrichError enrich errors with full exception message and optionally
stacktrace by issuing an
+ * additional RPC call to fetch error details. The RPC call is best-effort
at-most-once.
+ */
+ private def enrichError(
+ error: Error,
+ info: ErrorInfo,
+ sessionId: String,
+ userContext: UserContext): Error = {
+ val errorId = info.getMetadataOrDefault("errorId", null)
+ if (errorId == null) {
+ return error
+ }
+
+ try {
+ val errorDetailsResponse = grpcStub.fetchErrorDetails(
+ FetchErrorDetailsRequest
+ .newBuilder()
+ .setSessionId(sessionId)
+ .setErrorId(errorId)
+
.setUserContext(UserContext.newBuilder().setUserId(userContext.getUserId).build())
+ .build())
+
+ if (!errorDetailsResponse.hasExceptionInfo) {
+ return error
+ }
+
+ exceptionInfoToError(errorDetailsResponse.getExceptionInfo).get
+ } catch {
+ case _: StatusRuntimeException => error
+ }
+ }
+
+ private def errorInfoToThrowable(
+ info: ErrorInfo,
+ truncatedMessage: String,
+ sessionId: String,
+ userContext: UserContext): Option[Throwable] = {
+ val classes = deserializeClasses(info.getMetadataOrDefault("classes",
"[]"))
+
+ implicit val formats = DefaultFormats
+ var causeOpt: Option[Error] = None
+ JsonMethods
+ .parse(info.getMetadataOrDefault("causes", "[]"))
+ .extract[Array[Map[String, String]]]
+ .reverse
+ .foreach { m =>
+ causeOpt = Some(
+ Error(
+ classHierarchy = deserializeClasses(m.get("classes").get),
+ message = m.get("message").get,
+ stackTraceOpt = None,
+ causeOpt = causeOpt))
+ }
+ val error = Error(classes, truncatedMessage, stackTraceOpt = None,
causeOpt = causeOpt)
+ errorToThrowable(enrichError(error, info, sessionId, userContext))
Review Comment:
Why not first try to enrich and then do the fallback. Now we are duplicate
work. And, more importantly, it is much harder to follow, a casual reader might
get confused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]