grundprinzip commented on code in PR #42377:
URL: https://github.com/apache/spark/pull/42377#discussion_r1308595431


##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -778,6 +778,62 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+// GetErrorInfoRequest defines criteria for retrieving cached exceptions.
+message GetErrorInfoRequest {
+
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id).
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide a list of error ids to get.
+  repeated string error_ids = 3;

Review Comment:
   Does it have to be a repeated here?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -778,6 +778,62 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+// GetErrorInfoRequest defines criteria for retrieving cached exceptions.
+message GetErrorInfoRequest {
+
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id).
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide a list of error ids to get.
+  repeated string error_ids = 3;
+}
+
+// GetErrorInfoResponse returns list of exceptions matching the given error 
ids.
+message GetErrorInfoResponse {
+
+  message StackTraceElementInfo {
+
+    // Fully qualified name of the class containing the execution point.
+    string declaring_class = 1;
+
+    // Name of the method containing the execution point.
+    string method_name = 2;
+
+    // Name of the file containing the execution point.
+    string file_name = 3;
+
+    // Line number of the source line containing the execution point.
+    int32 line_number = 4;
+  }
+
+  // ExceptionInfo represents the serialized form of an exception.
+  message ExceptionInfo {
+    // Unique identifier for this specific exception instance.
+    string error_id = 1;
+
+    // Fully qualified names of the exception class and its parental classes.
+    repeated string error_type_hierarchy = 2;
+
+    // Detailed message of the exception.
+    string message = 3;
+
+    // StackTrace of the exception.
+    repeated StackTraceElementInfo stack_trace = 4;
+  }
+
+  // List of matched exceptions.
+  repeated ExceptionInfo exception_infos = 1;

Review Comment:
   Does this need a comment on the order of the trace elements?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -778,6 +778,62 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+// GetErrorInfoRequest defines criteria for retrieving cached exceptions.
+message GetErrorInfoRequest {
+
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id).
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide a list of error ids to get.
+  repeated string error_ids = 3;
+}
+
+// GetErrorInfoResponse returns list of exceptions matching the given error 
ids.
+message GetErrorInfoResponse {
+
+  message StackTraceElementInfo {

Review Comment:
   In Java the element is called `StackTraceElement`, my proposal would be to 
name it like this as well. In addition, I'm wondering how much benefit we get 
from this custom serialization because the `toString` implementation already is 
able to deal with the edge cases of absent line numbers etc
   
   
https://docs.oracle.com/javase/8/docs/api/java/lang/StackTraceElement.html#toString--



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/CustomSparkConnectBlockingStub.scala:
##########
@@ -27,11 +27,14 @@ private[client] class CustomSparkConnectBlockingStub(
     retryPolicy: GrpcRetryHandler.RetryPolicy) {
 
   private val stub = SparkConnectServiceGrpc.newBlockingStub(channel)
+
   private val retryHandler = new GrpcRetryHandler(retryPolicy)
 
+  private val grpcExceptionConverter = new GrpcExceptionConverter(stub)

Review Comment:
   doc? Why does this need the stub now?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -64,6 +69,94 @@ private[client] object GrpcExceptionConverter extends 
JsonUtils {
     }
   }
 
+  private def enrichError(errors: Array[Error], info: ErrorInfo): Array[Error] 
= {

Review Comment:
   doc



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##########
@@ -40,7 +42,15 @@ import 
org.apache.spark.sql.connect.service.SparkConnectService
 import org.apache.spark.sql.internal.SQLConf
 
 private[connect] object ErrorUtils extends Logging {
-  private def allClasses(cl: Class[_]): Seq[Class[_]] = {
+  private[connect] def allThrowables(t: Throwable): List[Throwable] = {

Review Comment:
   ```suggestion
     private[connect] def traverseCauses(t: Throwable): List[Throwable] = {
   ```
   
   maybe better?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -778,6 +778,62 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+// GetErrorInfoRequest defines criteria for retrieving cached exceptions.
+message GetErrorInfoRequest {
+
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id).
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide a list of error ids to get.
+  repeated string error_ids = 3;
+}
+
+// GetErrorInfoResponse returns list of exceptions matching the given error 
ids.
+message GetErrorInfoResponse {
+
+  message StackTraceElementInfo {
+
+    // Fully qualified name of the class containing the execution point.
+    string declaring_class = 1;
+
+    // Name of the method containing the execution point.
+    string method_name = 2;
+
+    // Name of the file containing the execution point.
+    string file_name = 3;
+
+    // Line number of the source line containing the execution point.
+    int32 line_number = 4;
+  }
+
+  // ExceptionInfo represents the serialized form of an exception.
+  message ExceptionInfo {

Review Comment:
   Can you explain a little bit more about the uniqueness properties of the 
error ID? Is there guaranteed to be exactly one?



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -813,5 +869,8 @@ service SparkConnectService {
   // Non reattachable executions are released automatically and immediately 
after the ExecutePlan
   // RPC and ReleaseExecute may not be used.
   rpc ReleaseExecute(ReleaseExecuteRequest) returns (ReleaseExecuteResponse) {}
+
+  // GetErrorInfo retrieves cached exceptions based on provided error ids.
+  rpc GetErrorInfo(GetErrorInfoRequest) returns (GetErrorInfoResponse) {}

Review Comment:
   ```suggestion
     rpc FetchExecutionErrorDetails(FetchExecutionErrorDetailsRequest) returns 
(FetchExecutionErrorDetailsResponse) {}
   ```



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##########
@@ -57,18 +67,62 @@ private[connect] object ErrorUtils extends Logging {
     classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+    
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  private val NUM_ERRORS_LIMIT = 5
+
+  private def buildStatusFromThrowable(
+      st: Throwable,
+      stackTraceEnabled: Boolean,
+      enrichErrorEnabled: Boolean,
+      userId: String,
+      sessionId: String): RPCStatus = {
     val errorInfo = ErrorInfo
       .newBuilder()
       .setReason(st.getClass.getName)
       .setDomain("org.apache.spark")
-      .putMetadata(
-        "classes",
-        
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))
+      .putMetadata("classes", serializeClasses(st))
+
+    // We can get full exception messages and optionally stacktrace by
+    // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+    // limit to reduce the probability of hitting the 8KB header limit.
+    val metadataValueLimit = if (enrichErrorEnabled) 512 else Int.MaxValue
+    val maxMessageSize = Math.min(metadataValueLimit, 2048)
+
+    if (enrichErrorEnabled) {
+      val errorIdAndErrors =
+        allThrowables(st).map((UUID.randomUUID().toString, 
_)).take(NUM_ERRORS_LIMIT)

Review Comment:
   it might be irrelevant, but `allThrowables(st).take(NUM_ERRORS_LIMIT).map()` 
avoids generating UUIDs that you don't need.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2843,6 +2843,21 @@ object SQLConf {
       // show full stacktrace in tests but hide in production by default.
       .createWithDefault(Utils.isTesting)
 
+  val SPARK_ENRICH_ERROR_ENABLED =
+    buildConf("spark.sql.spark.enrichError.enabled")
+      .doc("When true, it enriches errors with full exception messages on the 
client side.")
+      .version("3.5.0")

Review Comment:
   Should this actually be a SQL conf? Shouldn't this be a static server conf 
on the server and the client can still turn off if desired.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectGetErrorInfoHandler.scala:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.service
+
+import scala.jdk.CollectionConverters._
+
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.GetErrorInfoResponse
+import org.apache.spark.connect.proto.GetErrorInfoResponse.{ExceptionInfo, 
StackTraceElementInfo}
+import org.apache.spark.sql.connect.utils.ErrorUtils
+import org.apache.spark.sql.internal.SQLConf
+
+class SparkConnectGetErrorInfoHandler(
+    responseObserver: StreamObserver[proto.GetErrorInfoResponse]) {
+
+  def handle(v: proto.GetErrorInfoRequest): Unit = {
+    val sessionHolder =
+      SparkConnectService
+        .getOrCreateIsolatedSession(v.getUserContext.getUserId, v.getSessionId)
+
+    val exceptionInfos = v.getErrorIdsList.asScala.flatMap { errorId =>
+      Option(sessionHolder.errorIdToError.getIfPresent(errorId)) match {
+        case Some(error) =>
+          // Invalidate the cache immediately to free up space for other 
exceptions to be stored
+          // since this exception will no longer be accessed.
+          sessionHolder.errorIdToError.invalidate(errorId)

Review Comment:
   It might be an unlikely case, but the writer should check if an entry 
already exists for the UUID and in the worst case generate a new one.



##########
connector/connect/common/src/main/protobuf/spark/connect/base.proto:
##########
@@ -778,6 +778,62 @@ message ReleaseExecuteResponse {
   optional string operation_id = 2;
 }
 
+// GetErrorInfoRequest defines criteria for retrieving cached exceptions.
+message GetErrorInfoRequest {
+
+  // (Required)
+  //
+  // The session_id specifies a spark session for a user id (which is specified
+  // by user_context.user_id).
+  // The id should be an UUID string of the format 
`00112233-4455-6677-8899-aabbccddeeff`
+  string session_id = 1;
+
+  // User context
+  UserContext user_context = 2;
+
+  // (Required)
+  // Provide a list of error ids to get.
+  repeated string error_ids = 3;
+}
+
+// GetErrorInfoResponse returns list of exceptions matching the given error 
ids.
+message GetErrorInfoResponse {
+
+  message StackTraceElementInfo {
+
+    // Fully qualified name of the class containing the execution point.
+    string declaring_class = 1;
+
+    // Name of the method containing the execution point.
+    string method_name = 2;
+
+    // Name of the file containing the execution point.
+    string file_name = 3;
+
+    // Line number of the source line containing the execution point.
+    int32 line_number = 4;
+  }
+
+  // ExceptionInfo represents the serialized form of an exception.
+  message ExceptionInfo {
+    // Unique identifier for this specific exception instance.
+    string error_id = 1;

Review Comment:
   if error id I unique, shouldn't the exception infos be a map instead?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -24,15 +24,20 @@ import scala.reflect.ClassTag
 import com.google.rpc.ErrorInfo
 import io.grpc.StatusRuntimeException
 import io.grpc.protobuf.StatusProto
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkArithmeticException, 
SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, 
SparkIllegalArgumentException, SparkNumberFormatException, 
SparkRuntimeException, SparkUnsupportedOperationException, 
SparkUpgradeException}
+import org.apache.spark.connect.proto.{GetErrorInfoRequest, UserContext}
+import 
org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
 import org.apache.spark.sql.AnalysisException
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException, 
TempTableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.util.JsonUtils
 
-private[client] object GrpcExceptionConverter extends JsonUtils {
+private[client] class GrpcExceptionConverter(stub: 
SparkConnectServiceBlockingStub) {

Review Comment:
   None of this code here has any appropriate documentation. Since the code was 
originally written by you, I suggest that you provide proper documentation here 
as well.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -64,6 +69,94 @@ private[client] object GrpcExceptionConverter extends 
JsonUtils {
     }
   }
 
+  private def enrichError(errors: Array[Error], info: ErrorInfo): Array[Error] 
= {
+    val sessionId = info.getMetadataOrDefault("sessionId", null)
+    val userId = info.getMetadataOrDefault("userId", null)
+
+    if (sessionId == null || userId == null) {
+      return errors
+    }
+
+    try {
+      val errorInfoResponse = stub.getErrorInfo(

Review Comment:
   It's not obvious directly that this is doing an RPC now. 



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala:
##########
@@ -64,6 +69,94 @@ private[client] object GrpcExceptionConverter extends 
JsonUtils {
     }
   }
 
+  private def enrichError(errors: Array[Error], info: ErrorInfo): Array[Error] 
= {
+    val sessionId = info.getMetadataOrDefault("sessionId", null)
+    val userId = info.getMetadataOrDefault("userId", null)

Review Comment:
   Session ID is a client specified value, similar to the User ID, we should 
not have the need to return them to the client.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##########
@@ -57,18 +67,62 @@ private[connect] object ErrorUtils extends Logging {
     classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+    
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  private val NUM_ERRORS_LIMIT = 5
+
+  private def buildStatusFromThrowable(
+      st: Throwable,
+      stackTraceEnabled: Boolean,
+      enrichErrorEnabled: Boolean,
+      userId: String,
+      sessionId: String): RPCStatus = {
     val errorInfo = ErrorInfo
       .newBuilder()
       .setReason(st.getClass.getName)
       .setDomain("org.apache.spark")
-      .putMetadata(
-        "classes",
-        
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))
+      .putMetadata("classes", serializeClasses(st))
+
+    // We can get full exception messages and optionally stacktrace by
+    // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+    // limit to reduce the probability of hitting the 8KB header limit.
+    val metadataValueLimit = if (enrichErrorEnabled) 512 else Int.MaxValue
+    val maxMessageSize = Math.min(metadataValueLimit, 2048)
+
+    if (enrichErrorEnabled) {
+      val errorIdAndErrors =
+        allThrowables(st).map((UUID.randomUUID().toString, 
_)).take(NUM_ERRORS_LIMIT)
+
+      errorInfo
+        .putMetadata("userId", userId)
+        .putMetadata("sessionId", sessionId)
+        .putMetadata("errorId", errorIdAndErrors.head._1)
+        .putMetadata(
+          "causes",
+          JsonMethods.compact(JsonMethods.render(errorIdAndErrors.tail.map {
+            case (errorId, error) =>
+              Map(
+                "errorId" -> errorId,
+                "message" -> SparkConnectService.extractErrorMessage(error, 
maxMessageSize),
+                "classes" -> serializeClasses(error))
+          })))
+
+      val sessionHolder =
+        SparkConnectService
+          .getOrCreateIsolatedSession(userId, sessionId)
+
+      errorIdAndErrors.foreach { case (errorId, error) =>
+        sessionHolder.errorIdToError.put(errorId, error)
+      }
+    }
 
     lazy val stackTrace = Option(ExceptionUtils.getStackTrace(st))
     val withStackTrace = if (stackTraceEnabled && stackTrace.nonEmpty) {

Review Comment:
   adding the new RPC should never embedd the stack trace into the response, 
but always fetch the stack trace using the additional RPC.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -201,6 +201,20 @@ class SparkConnectService(debug: Boolean) extends 
AsyncService with BindableServ
         sessionId = request.getSessionId)
   }
 
+  override def getErrorInfo(
+      request: proto.GetErrorInfoRequest,
+      responseObserver: StreamObserver[proto.GetErrorInfoResponse]): Unit = {
+    try {
+      new SparkConnectGetErrorInfoHandler(responseObserver).handle(request)
+    } catch {
+      ErrorUtils.handleError(
+        "getErrorInfo",
+        observer = responseObserver,
+        userId = request.getUserContext.getUserId,
+        sessionId = request.getSessionId)

Review Comment:
   There should be no need to return them from the server



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala:
##########
@@ -57,18 +67,62 @@ private[connect] object ErrorUtils extends Logging {
     classes.toSeq
   }
 
-  private def buildStatusFromThrowable(st: Throwable, stackTraceEnabled: 
Boolean): RPCStatus = {
+  private def serializeClasses(t: Throwable): String = {
+    
JsonMethods.compact(JsonMethods.render(allClasses(t.getClass).map(_.getName)))
+  }
+
+  private val NUM_ERRORS_LIMIT = 5
+
+  private def buildStatusFromThrowable(
+      st: Throwable,
+      stackTraceEnabled: Boolean,
+      enrichErrorEnabled: Boolean,
+      userId: String,
+      sessionId: String): RPCStatus = {
     val errorInfo = ErrorInfo
       .newBuilder()
       .setReason(st.getClass.getName)
       .setDomain("org.apache.spark")
-      .putMetadata(
-        "classes",
-        
JsonMethods.compact(JsonMethods.render(allClasses(st.getClass).map(_.getName))))
+      .putMetadata("classes", serializeClasses(st))
+
+    // We can get full exception messages and optionally stacktrace by
+    // a separate RPC call if enrichErrorEnabled is true. So imposing a smaller
+    // limit to reduce the probability of hitting the 8KB header limit.
+    val metadataValueLimit = if (enrichErrorEnabled) 512 else Int.MaxValue
+    val maxMessageSize = Math.min(metadataValueLimit, 2048)
+
+    if (enrichErrorEnabled) {
+      val errorIdAndErrors =
+        allThrowables(st).map((UUID.randomUUID().toString, 
_)).take(NUM_ERRORS_LIMIT)
+
+      errorInfo
+        .putMetadata("userId", userId)
+        .putMetadata("sessionId", sessionId)
+        .putMetadata("errorId", errorIdAndErrors.head._1)

Review Comment:
   I think this is the part where the whole documentation needs to be much more 
explicit. What does the errorID here actually mean compared to the other error 
IDs.



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -201,6 +201,20 @@ class SparkConnectService(debug: Boolean) extends 
AsyncService with BindableServ
         sessionId = request.getSessionId)
   }
 
+  override def getErrorInfo(
+      request: proto.GetErrorInfoRequest,
+      responseObserver: StreamObserver[proto.GetErrorInfoResponse]): Unit = {
+    try {
+      new SparkConnectGetErrorInfoHandler(responseObserver).handle(request)
+    } catch {
+      ErrorUtils.handleError(
+        "getErrorInfo",
+        observer = responseObserver,
+        userId = request.getUserContext.getUserId,
+        sessionId = request.getSessionId)

Review Comment:
   user and session ID are generated by the client
   



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -45,6 +45,9 @@ case class SessionHolder(userId: String, sessionId: String, 
session: SparkSessio
   private val executions: ConcurrentMap[String, ExecuteHolder] =
     new ConcurrentHashMap[String, ExecuteHolder]()
 
+  val errorIdToError =

Review Comment:
   doc on the magic number params. Make them configurable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to