WweiL commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1571450947
##########
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##########
@@ -63,8 +63,11 @@ def main(infile: IO, outfile: IO) -> None:
spark = spark_connect_session
# TODO(SPARK-44461): Enable Process Isolation
-
- func = worker.read_command(pickle_ser, infile)
+ read_command_exception = None
+ try:
+ func = worker.read_command(pickle_ser, infile)
+ except Exception as e:
+ read_command_exception = e
write_int(0, outfile) # Indicate successful initialization
Review Comment:
I would actually do a `write_int(1, outfile)`, then directly write out the
error with `handle_worker_exception`, to indicate an unsuccessful
initialization. And in `StreamingForeachBatchHelper`, on where we receive this
"0", we check if that's 1, if so, then read error and throw
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]