itholic commented on code in PR #42594:
URL: https://github.com/apache/spark/pull/42594#discussion_r1301241535


##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -237,7 +239,13 @@ def addListener(self, listener: StreamingQueryListener) -> 
None:
         listener._init_listener_id()
         cmd = pb2.StreamingQueryManagerCommand()
         expr = proto.PythonUDF()
-        expr.command = CloudPickleSerializer().dumps(listener)
+        try:
+            expr.command = CloudPickleSerializer().dumps(listener)
+        except pickle.PicklingError:
+            raise PySparkRuntimeError(

Review Comment:
   Yes, I believe we need a new error class for new type of user-facing errors. 
Could you add a new PySpark error class for representing 
`pickle.PicklingError`?? See https://github.com/apache/spark/pull/40938/files 
as an example. I think we can also do it as a follow ups.



##########
python/pyspark/errors/error_classes.py:
##########
@@ -718,6 +718,11 @@
       "pandas iterator UDF should exhaust the input iterator."
     ]
   },
+  "STREAMING_CONNECT_SERIALIZATION_ERROR" : {
+    "message" : [
+      "Cannot serialize the function `<name>`. If you accessed the spark 
session, or a dataframe defined outside of the function, or any object that 
contains a spark session, please be aware that they are not allowed in Spark 
Connect. For foreachBatch, please access the spark session using 
`df.sparkSession`, where `df` is the first parameter in your foreachBatch 
function. For StreamingQueryListener, please access the spark session using 
`self.spark`. For details please check out the PySpark doc for foreachBatch and 
StreamingQueryListener."

Review Comment:
   nit: 
   ```suggestion
         "Cannot serialize the function `<name>`. If you accessed the Spark 
session, or a DataFrame defined outside of the function, or any object that 
contains a Spark session, please be aware that they are not allowed in Spark 
Connect. For `foreachBatch`, please access the Spark session using 
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch` 
function. For `StreamingQueryListener`, please access the Spark session using 
`self.spark`. For details please check out the PySpark doc for `foreachBatch` 
and `StreamingQueryListener`."
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to