zhengruifeng commented on code in PR #39212:
URL: https://github.com/apache/spark/pull/39212#discussion_r1057452915
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -49,6 +53,67 @@ class SparkConnectService(debug: Boolean)
extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
with Logging {
+ /**
+ * Common exception handling function for the Analysis and Execution
methods. Closes the stream
+ * after the error has been sent.
+ *
+ * @param opType
+ * String value indicating the operation type (analysis, execution)
+ * @param observer
+ * The GRPC response observer.
+ * @tparam V
+ * @return
+ */
+ private def handleError[V](
+ opType: String,
+ observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = {
+ case ae: AnalysisException =>
+ logError(s"Error during: $opType", ae)
+ val status = RPCStatus
+ .newBuilder()
+ .setCode(RPCCode.INTERNAL_VALUE)
+ .addDetails(
+ ProtoAny.pack(
+ ErrorInfo
+ .newBuilder()
+ .setReason(ae.getClass.getSimpleName)
+ .setDomain("org.apache.spark")
+ .putMetadata("message", ae.getSimpleMessage)
+ .putMetadata("plan", Option(ae.plan).flatten.map(p =>
s"$p").getOrElse(""))
+ .build()))
+ .setMessage(ae.getLocalizedMessage)
+ .build()
+ observer.onError(StatusProto.toStatusRuntimeException(status))
+ case se: SparkException =>
+ logError(s"Error during: $opType", se)
+ val status = RPCStatus
+ .newBuilder()
+ .setCode(RPCCode.INTERNAL_VALUE)
+ .addDetails(
+ ProtoAny.pack(
+ ErrorInfo
+ .newBuilder()
+ .setReason(se.getClass.getSimpleName)
+ .setDomain("org.apache.spark")
+ .putMetadata("message", se.getMessage)
+ .build()))
+ .setMessage(se.getLocalizedMessage)
+ .build()
+ observer.onError(StatusProto.toStatusRuntimeException(status))
Review Comment:
shall we catch `SparkThrowable` after `SparkException`? since there are some
extra exceptions like `SparkArithmeticException`
##########
python/pyspark/sql/connect/client.py:
##########
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest)
-> "pandas.DataFrame":
if m is not None:
df.attrs["metrics"] = self._build_metrics(m)
return df
+
+ def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
Review Comment:
is it possible to match the same logic in
https://github.com/apache/spark/blob/764edaf8b2e1c42a32e7bfa058cf8ee26ce02a9e/python/pyspark/sql/utils.py#L158-L197
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]