grundprinzip commented on code in PR #39212:
URL: https://github.com/apache/spark/pull/39212#discussion_r1057696164


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala:
##########
@@ -49,6 +53,67 @@ class SparkConnectService(debug: Boolean)
     extends SparkConnectServiceGrpc.SparkConnectServiceImplBase
     with Logging {
 
+  /**
+   * Common exception handling function for the Analysis and Execution 
methods. Closes the stream
+   * after the error has been sent.
+   *
+   * @param opType
+   *   String value indicating the operation type (analysis, execution)
+   * @param observer
+   *   The GRPC response observer.
+   * @tparam V
+   * @return
+   */
+  private def handleError[V](
+      opType: String,
+      observer: StreamObserver[V]): PartialFunction[Throwable, Unit] = {
+    case ae: AnalysisException =>
+      logError(s"Error during: $opType", ae)
+      val status = RPCStatus
+        .newBuilder()
+        .setCode(RPCCode.INTERNAL_VALUE)
+        .addDetails(
+          ProtoAny.pack(
+            ErrorInfo
+              .newBuilder()
+              .setReason(ae.getClass.getSimpleName)
+              .setDomain("org.apache.spark")
+              .putMetadata("message", ae.getSimpleMessage)
+              .putMetadata("plan", Option(ae.plan).flatten.map(p => 
s"$p").getOrElse(""))
+              .build()))
+        .setMessage(ae.getLocalizedMessage)
+        .build()
+      observer.onError(StatusProto.toStatusRuntimeException(status))
+    case se: SparkException =>
+      logError(s"Error during: $opType", se)
+      val status = RPCStatus
+        .newBuilder()
+        .setCode(RPCCode.INTERNAL_VALUE)
+        .addDetails(
+          ProtoAny.pack(
+            ErrorInfo
+              .newBuilder()
+              .setReason(se.getClass.getSimpleName)
+              .setDomain("org.apache.spark")
+              .putMetadata("message", se.getMessage)
+              .build()))
+        .setMessage(se.getLocalizedMessage)
+        .build()
+      observer.onError(StatusProto.toStatusRuntimeException(status))

Review Comment:
   I actually removed `SparkException` and replaced it with `SparkThrowable` 
because SE implements ST and used SparkThrowableHelper for getting details



##########
python/pyspark/sql/connect/client.py:
##########
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) 
-> "pandas.DataFrame":
         if m is not None:
             df.attrs["metrics"] = self._build_metrics(m)
         return df
+
+    def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:

Review Comment:
   I simplified the logic because it's going to be messy to replicate all of 
the exception types exactly from the SQL side. Right now the printed output 
looks like this for example:
   
   ```
   SparkConnectException: (org.apache.spark.SparkNumberFormatException) 
[CAST_INVALID_INPUT] The value 'id' of the type "STRING" cannot be cast to 
"DOUBLE" because it is malformed. Correct the value as per the syntax, or 
change its target type. Use `try_cast` to tolerate malformed input and return 
NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass 
this error.
   == SQL(line 1, position 8) ==
   select cast('id' as double) from range(10)
   ```



##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
 )
 
 
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger("pyspark.sql.connect.client")
+    handler = logging.StreamHandler()
+    handler.setFormatter(
+        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
+    )
+    logger.addHandler(handler)
+
+    # Check the environment variables for log levels:
+    level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
+    if level == "info":
+        logger.setLevel(logging.INFO)
+    elif level == "warn":
+        logger.setLevel(logging.WARNING)
+    elif level == "error":
+        logger.setLevel(logging.ERROR)
+    elif level == "debug":
+        logger.setLevel(logging.DEBUG)
+    else:
+        logger.disabled = True
+    return logger
+
+
+# Instantiate the logger based on the environment configuration.
+_logger = _configure_logging()

Review Comment:
   done



##########
python/pyspark/sql/connect/client.py:
##########
@@ -377,6 +469,19 @@ def _analyze_plan_request_with_metadata(self) -> 
pb2.AnalyzePlanRequest:
         return req
 
     def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") -> 
AnalyzeResult:
+        """
+        Call the analyze RPC of Spark Connect.

Review Comment:
   Done.



##########
python/pyspark/sql/connect/client.py:
##########
@@ -377,6 +469,19 @@ def _analyze_plan_request_with_metadata(self) -> 
pb2.AnalyzePlanRequest:
         return req
 
     def _analyze(self, plan: pb2.Plan, explain_mode: str = "extended") -> 
AnalyzeResult:
+        """
+        Call the analyze RPC of Spark Connect.
+        Parameters
+        ----------
+        plan: pb2.Plan
+           Proto representation of the plan.
+        explain_mode: str
+           Explain mode

Review Comment:
   Done.



##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
 )
 
 
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger("pyspark.sql.connect.client")
+    handler = logging.StreamHandler()
+    handler.setFormatter(
+        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
+    )
+    logger.addHandler(handler)
+
+    # Check the environment variables for log levels:
+    level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()

Review Comment:
   I'm for improving iteratively. The main reason I haven't picked up an INI 
file yet is because we don't really have a good way of placing the file and 
finding it.
   
   But we can do this in a follow up.



##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
 )
 
 
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger("pyspark.sql.connect.client")

Review Comment:
   Done



##########
connector/connect/README.md:
##########
@@ -90,7 +90,7 @@ To use the release version of Spark Connect:
 
 ### Generate proto generated files for the Python client
 1. Install `buf version 1.11.0`: https://docs.buf.build/installation
-2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0`
+2. Run `pip install grpcio==1.48.1 protobuf==3.19.5 mypy-protobuf==3.3.0 
googleapis-common-protos==1.56.4 grpcio-status==1.48.1`

Review Comment:
   Done



##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
 )
 
 
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger("pyspark.sql.connect.client")
+    handler = logging.StreamHandler()
+    handler.setFormatter(
+        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
+    )
+    logger.addHandler(handler)
+
+    # Check the environment variables for log levels:
+    level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()
+    if level == "info":
+        logger.setLevel(logging.INFO)
+    elif level == "warn":
+        logger.setLevel(logging.WARNING)
+    elif level == "error":
+        logger.setLevel(logging.ERROR)
+    elif level == "debug":
+        logger.setLevel(logging.DEBUG)
+    else:
+        logger.disabled = True
+    return logger
+
+
+# Instantiate the logger based on the environment configuration.
+_logger = _configure_logging()
+
+
+class SparkConnectClientException(Exception):

Review Comment:
   Let me rename to `SparkConnectException` and `SparkConnectAnalysisException` 
so that we have freedom to figure out what to do.



##########
python/pyspark/sql/connect/client.py:
##########
@@ -436,3 +568,39 @@ def _execute_and_fetch(self, req: pb2.ExecutePlanRequest) 
-> "pandas.DataFrame":
         if m is not None:
             df.attrs["metrics"] = self._build_metrics(m)
         return df
+
+    def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:

Review Comment:
   Let me see what I can do without replicating a gigantic list of branches on 
the server and client side. 



##########
python/pyspark/sql/connect/client.py:
##########
@@ -36,6 +41,49 @@
 )
 
 
+def _configure_logging() -> logging.Logger:
+    """Configure logging for the Spark Connect clients."""
+    logger = logging.getLogger("pyspark.sql.connect.client")
+    handler = logging.StreamHandler()
+    handler.setFormatter(
+        logging.Formatter(fmt="%(asctime)s %(process)d %(levelname)s 
%(funcName)s %(message)s")
+    )
+    logger.addHandler(handler)
+
+    # Check the environment variables for log levels:
+    level = os.getenv("SPARK_CONNECT_LOG_LEVEL", "none").lower()

Review Comment:
   Done.



##########
python/pyspark/sql/connect/client.py:
##########
@@ -397,30 +502,57 @@ def _analyze(self, plan: pb2.Plan, explain_mode: str = 
"extended") -> AnalyzeRes
         else:  # formatted
             req.explain.explain_mode = pb2.Explain.ExplainMode.FORMATTED
 
-        resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
-        return AnalyzeResult.fromProto(resp)
+        try:
+            resp = self._stub.AnalyzePlan(req, 
metadata=self._builder.metadata())
+            return AnalyzeResult.fromProto(resp)
+        except grpc.RpcError as rpc_error:
+            self._handle_error(rpc_error)
 
     def _process_batch(self, arrow_batch: pb2.ExecutePlanResponse.ArrowBatch) 
-> "pandas.DataFrame":
         with pa.ipc.open_stream(arrow_batch.data) as rd:
             return rd.read_pandas()
 
     def _execute(self, req: pb2.ExecutePlanRequest) -> None:
-        for b in self._stub.ExecutePlan(req, 
metadata=self._builder.metadata()):
-            continue
-        return
+        """
+        Execute the passed request `req` and drop all results.
+
+        Parameters
+        ----------
+        req : pb2.ExecutePlanRequest
+            Proto representation of the plan.
+
+        Returns
+        -------
+

Review Comment:
   Donle



##########
python/pyspark/sql/connect/client.py:
##########
@@ -60,7 +108,18 @@ class ChannelBuilder:
 
     DEFAULT_PORT = 15002
 
-    def __init__(self, url: str) -> None:
+    def __init__(self, url: str, channelOptions: Optional[List[Tuple[str, 
Any]]] = None) -> None:
+        """
+        Constructs a new channel builder. This is used to create the proper 
GRPC channel from
+        the connection string.
+
+        Parameters
+        ----------
+        url : str
+            Spark Connect connection string
+        channelOptions: Optional[List[tuple]]

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to