itholic commented on code in PR #42594:
URL: https://github.com/apache/spark/pull/42594#discussion_r1301241535
##########
python/pyspark/sql/connect/streaming/query.py:
##########
@@ -237,7 +239,13 @@ def addListener(self, listener: StreamingQueryListener) ->
None:
listener._init_listener_id()
cmd = pb2.StreamingQueryManagerCommand()
expr = proto.PythonUDF()
- expr.command = CloudPickleSerializer().dumps(listener)
+ try:
+ expr.command = CloudPickleSerializer().dumps(listener)
+ except pickle.PicklingError:
+ raise PySparkRuntimeError(
Review Comment:
Yes, I believe we need a new error class for new type of user-facing errors.
Could you add a new PySpark error class for representing
`pickle.PicklingError`?? See https://github.com/apache/spark/pull/40938/files
as an example. I think we can also do it as a follow ups.
##########
python/pyspark/errors/error_classes.py:
##########
@@ -718,6 +718,11 @@
"pandas iterator UDF should exhaust the input iterator."
]
},
+ "STREAMING_CONNECT_SERIALIZATION_ERROR" : {
+ "message" : [
+ "Cannot serialize the function `<name>`. If you accessed the spark
session, or a dataframe defined outside of the function, or any object that
contains a spark session, please be aware that they are not allowed in Spark
Connect. For foreachBatch, please access the spark session using
`df.sparkSession`, where `df` is the first parameter in your foreachBatch
function. For StreamingQueryListener, please access the spark session using
`self.spark`. For details please check out the PySpark doc for foreachBatch and
StreamingQueryListener."
Review Comment:
nit:
```suggestion
"Cannot serialize the function `<name>`. If you accessed the Spark
session, or a DataFrame defined outside of the function, or any object that
contains a Spark session, please be aware that they are not allowed in Spark
Connect. For `foreachBatch`, please access the Spark session using
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch`
function. For `StreamingQueryListener`, please access the Spark session using
`self.spark`. For details please check out the PySpark doc for `foreachBatch`
and `StreamingQueryListener`."
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]