hvanhovell commented on code in PR #42987:
URL: https://github.com/apache/spark/pull/42987#discussion_r1330870268


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,49 +24,135 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, 
SparkIllegalArgumentException, SparkNumberFormatException, 
SparkRuntimeException, SparkUnsupportedOperationException, 
SparkUpgradeException}
+import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, 
FetchErrorDetailsResponse, UserContext}
+import 
org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, 
TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
-  def convert[T](f: => T): T = {
+/**
+ * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions 
into Spark exceptions.
+ * It does so by utilizing the ErrorInfo defined in error_details.proto and 
making an additional
+ * FetchErrorDetails RPC call to retrieve the full error message and 
optionally the server-side
+ * stacktrace.
+ *
+ * If the FetchErrorDetails RPC call succeeds, the exceptions will be 
constructed based on the
+ * response. If the RPC call fails, the exception will be constructed based on 
the ErrorInfo. If
+ * the ErrorInfo is missing, the exception will be constructed based on the 
StatusRuntimeException
+ * itself.
+ */
+private[client] class GrpcExceptionConverter(grpcStub: 
SparkConnectServiceBlockingStub)
+    extends Logging {
+  import GrpcExceptionConverter._
+
+  def convert[T](sessionId: String, userContext: UserContext)(f: => T): T = {
     try {
       f
     } catch {
       case e: StatusRuntimeException =>
-        throw toThrowable(e)
+        throw toThrowable(e, sessionId, userContext)
     }
   }
 
-  def convertIterator[T](iter: CloseableIterator[T]): CloseableIterator[T] = {
+  def convertIterator[T](
+      sessionId: String,
+      userContext: UserContext,
+      iter: CloseableIterator[T]): CloseableIterator[T] = {
     new WrappedCloseableIterator[T] {
 
       override def innerIterator: Iterator[T] = iter
 
       override def hasNext: Boolean = {
-        convert {
+        convert(sessionId, userContext) {
           iter.hasNext
         }
       }
 
       override def next(): T = {
-        convert {
+        convert(sessionId, userContext) {
           iter.next()
         }
       }
 
       override def close(): Unit = {
-        convert {
+        convert(sessionId, userContext) {
           iter.close()
         }
       }
     }
   }
 
+  /**
+   * fetchEnrichedError fetches enriched errors with full exception message 
and optionally
+   * stacktrace by issuing an additional RPC call to fetch error details. The 
RPC call is
+   * best-effort at-most-once.
+   */
+  private def fetchEnrichedError(
+      info: ErrorInfo,
+      sessionId: String,
+      userContext: UserContext): Option[Throwable] = {
+    val errorId = info.getMetadataOrDefault("errorId", null)
+    if (errorId == null) {
+      logWarning("Unable to fetch enriched error since errorId is missing")
+      return None
+    }
+
+    try {
+      val errorDetailsResponse = grpcStub.fetchErrorDetails(
+        FetchErrorDetailsRequest
+          .newBuilder()
+          .setSessionId(sessionId)
+          .setErrorId(errorId)
+          
.setUserContext(UserContext.newBuilder().setUserId(userContext.getUserId).build())
+          .build())
+
+      if (!errorDetailsResponse.hasRootErrorIdx) {
+        logWarning("Unable to fetch enriched error since error is not found")
+        return None
+      }
+
+      Some(
+        errorsToThrowable(
+          errorDetailsResponse.getRootErrorIdx,
+          errorDetailsResponse.getErrorsList.asScala))
+    } catch {
+      case e: StatusRuntimeException =>
+        logWarning("Unable to fetch enriched error", e)
+        None
+    }
+  }
+
+  private def toThrowable(
+      ex: StatusRuntimeException,
+      sessionId: String,
+      userContext: UserContext): Throwable = {
+    val status = StatusProto.fromThrowable(ex)
+
+    val errorInfoOpt = status.getDetailsList.asScala
+      .find(_.is(classOf[ErrorInfo]))
+      .map(_.unpack(classOf[ErrorInfo]))
+
+    if (errorInfoOpt.isDefined) {

Review Comment:
   The code below implements the policy you described in the documentation. 
Please add comments for every step of the way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to