grundprinzip commented on code in PR #39212:
URL: https://github.com/apache/spark/pull/39212#discussion_r1057696164
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -49,6 +53,67 @@ class SparkConnectService(debug: Boolean)
extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
with Logging {
+ /**
+ * Common exception handling function for the Analysis and Execution
methods. Closes the stream
+ * after the error has been sent.
+ *
+ * @param opType
+ * String value indicating the operation type (analysis, execution)
+ * @param observer
+ * The GRPC response observer.
+ * @tparam V
+ * @return
+ */
+ private def handleError[V](
+ opType: String,
+ observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = {
+ case ae: AnalysisException =>
+ logError(s"Error during: $opType", ae)
+ val status = RPCStatus
+ .newBuilder()
+ .setCode(RPCCode.INTERNAL_VALUE)
+ .addDetails(
+ ProtoAny.pack(
+ ErrorInfo
+ .newBuilder()
+ .setReason(ae.getClass.getSimpleName)
+ .setDomain("org.apache.spark")
+ .putMetadata("message", ae.getSimpleMessage)
+ .putMetadata("plan", Option(ae.plan).flatten.map(p =>
s"$p").getOrElse(""))
+ .build()))
+ .setMessage(ae.getLocalizedMessage)
+ .build()
+ observer.onError(StatusProto.toStatusRuntimeException(status))
+ case se: SparkException =>
+ logError(s"Error during: $opType", se)
+ val status = RPCStatus
+ .newBuilder()
+ .setCode(RPCCode.INTERNAL_VALUE)
+ .addDetails(
+ ProtoAny.pack(
+ ErrorInfo
+ .newBuilder()
+ .setReason(se.getClass.getSimpleName)
+ .setDomain("org.apache.spark")
+ .putMetadata("message", se.getMessage)
+ .build()))
+ .setMessage(se.getLocalizedMessage)
+ .build()
+ observer.onError(StatusProto.toStatusRuntimeException(status))
Review Comment:
I actually removed `SparkException` and replaced it with `SparkThrowable`
because SE implements ST and used SparkThrowableHelper for getting details
##########
python/pyspark/sql/connect/client.py:
##########
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest)
-> "pandas.DataFrame":
if m is not None:
df.attrs["metrics"] = self._build_metrics(m)
return df
+
+ def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
Review Comment:
I simplified the logic because it's going to be messy to replicate all of
the exception types exactly from the SQL side. Right now the printed output
looks like this for example:
```
SparkConnectException: (org.apache.spark.SparkNumberFormatException)
[CAST_INVALID_INPUT] The value 'id' of the type "STRING" cannot be cast to
"DOUBLE" because it is malformed. Correct the value as per the syntax, or
change its target type. Use `try_cast` to tolerate malformed input and return
NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass
this error.
== SQL(line 1, position 8) ==
select cast('id' as double) from range(10)
```
##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
)
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger("pyspark.sql.connect.client")
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
+ )
+ logger.addHandler(handler)
+
+ # Check the environment variables for log levels:
+ level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
+ if level == "info":
+ logger.setLevel(logging.INFO)
+ elif level == "warn":
+ logger.setLevel(logging.WARNING)
+ elif level == "error":
+ logger.setLevel(logging.ERROR)
+ elif level == "debug":
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.disabled = True
+ return logger
+
+
+# Instantiate the logger based on the environment configuration.
+_logger = _configure_logging()
Review Comment:
done
##########
python/pyspark/sql/connect/client.py:
##########
@@ -377,6 +469,19 @@ def _analyze_plan_request_with_metadata(self) ->
pb2.AnalyzePlanRequest:
return req
def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") ->
AnalyzeResult:
+ """
+ Call the analyze RPC of Spark Connect.
Review Comment:
Done.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -377,6 +469,19 @@ def _analyze_plan_request_with_metadata(self) ->
pb2.AnalyzePlanRequest:
return req
def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") ->
AnalyzeResult:
+ """
+ Call the analyze RPC of Spark Connect.
+ Parameters
+ ----------
+ plan: pb2.Plan
+ Proto representation of the plan.
+ explain_mode: str
+ Explain mode
Review Comment:
Done.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
)
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger("pyspark.sql.connect.client")
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
+ )
+ logger.addHandler(handler)
+
+ # Check the environment variables for log levels:
+ level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
Review Comment:
I'm for improving iteratively. The main reason I haven't picked up an INI
file yet is because we don't really have a good way of placing the file and
finding it.
But we can do this in a follow up.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
)
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger("pyspark.sql.connect.client")
Review Comment:
Done
##########
connector/connect/README.md:
##########
@@ -90,7 +90,7 @@ To use the release version of Spark Connect:
### Generate proto generated files for the Python client
1. Install `buf version 1.11.0`: https://docs.buf.build/installation
-2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0`
+2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0
googleapis-common-protos==1.56.4 grpcio-status==1.48.1`
Review Comment:
Done
##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
)
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger("pyspark.sql.connect.client")
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
+ )
+ logger.addHandler(handler)
+
+ # Check the environment variables for log levels:
+ level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
+ if level == "info":
+ logger.setLevel(logging.INFO)
+ elif level == "warn":
+ logger.setLevel(logging.WARNING)
+ elif level == "error":
+ logger.setLevel(logging.ERROR)
+ elif level == "debug":
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.disabled = True
+ return logger
+
+
+# Instantiate the logger based on the environment configuration.
+_logger = _configure_logging()
+
+
+class SparkConnectClientException(Exception):
Review Comment:
Let me rename to `SparkConnectException` and `SparkConnectAnalysisException`
so that we have freedom to figure out what to do.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest)
-> "pandas.DataFrame":
if m is not None:
df.attrs["metrics"] = self._build_metrics(m)
return df
+
+ def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
Review Comment:
Let me see what I can do without replicating a gigantic list of branches on
the server and client side.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
)
+def _configure_logging() -> logging.Logger:
+ """Configure logging for the Spark Connect clients."""
+ logger = logging.getLogger("pyspark.sql.connect.client")
+ handler = logging.StreamHandler()
+ handler.setFormatter(
+ logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s
%(funcName)s %(message)s")
+ )
+ logger.addHandler(handler)
+
+ # Check the environment variables for log levels:
+ level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
Review Comment:
Done.
##########
python/pyspark/sql/connect/client.py:
##########
@@ -397,30 +502,57 @@ def _analyze(self, plan: pb2.Plan, explain_mode: str =
"extended") -> AnalyzeRes
else: # formatted
req.explain.explain_mode = pb2.Explain.ExplainMode.FORMATTED
- resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
- return AnalyzeResult.fromProto(resp)
+ try:
+ resp = self._stub.AnalyzePlan(req,
metadata=self._builder.metadata())
+ return AnalyzeResult.fromProto(resp)
+ except grpc.RpcError as rpc_error:
+ self._handle_error(rpc_error)
def _process_batch(self, arrow_batch: pb2.ExecutePlanResponse.ArrowBatch)
-> "pandas.DataFrame":
with pa.ipc.open_stream(arrow_batch.data) as rd:
return rd.read_pandas()
def _execute(self, req: pb2.ExecutePlanRequest) -> None:
- for b in self._stub.ExecutePlan(req,
metadata=self._builder.metadata()):
- continue
- return
+ """
+ Execute the passed request `req` and drop all results.
+
+ Parameters
+ ----------
+ req : pb2.ExecutePlanRequest
+ Proto representation of the plan.
+
+ Returns
+ -------
+
Review Comment:
Donle
##########
python/pyspark/sql/connect/client.py:
##########
@@ -60,7 +108,18 @@ class ChannelBuilder:
DEFAULT_PORT = 15002
- def __init__(self, url: str) -> None:
+ def __init__(self, url: str, channelOptions: Optional[List[Tuple[str,
Any]]] = None) -> None:
+ """
+ Constructs a new channel builder. This is used to create the proper
GRPC channel from
+ the connection string.
+
+ Parameters
+ ----------
+ url : str
+ Spark Connect connection string
+ channelOptions: Optional[List[tuple]]
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]