Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
HyukjinKwon commented on PR #46125: URL: https://github.com/apache/spark/pull/46125#issuecomment-2073665401 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
HyukjinKwon closed pull request #46125: [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark URL: https://github.com/apache/spark/pull/46125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
ericm-db commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576656803 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576629583 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): Review Comment: minor: rename to '...RunnerInitializationFailure' or '...FailedToInitialize' ... ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage) +} logInfo(s"Runner initialization succeeded (returned $resFromPython).") (dataOut, dataIn) } + def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: String): +StreamingPythonRunnerInitializationException = { +new StreamingPythonRunnerInitializationException(resFromPython, errMessage) + } + + class StreamingPythonRunnerInitializationException(resFromPython: Int, errMessage: String) +extends SparkPythonException( + errorClass = "STREAMING_PYTHON_RUNNER_DID_NOT_INITIALIZE", Review Comment: Similar rename here. This is what user will see. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
ericm-db commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576514370 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw new PythonException(s"Streaming Runner initialization failed" + +s" (returned $resFromPython). " + +s"Error message: $errMessage", null) +} Review Comment: Created a new error class, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
grundprinzip commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576480636 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -76,16 +73,21 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] func(batch_df, batch_id) print(f"{log_name} Completed batch {batch_id} with DF id {df_id}") -while True: -df_ref_id = utf8_deserializer.loads(infile) -batch_id = read_long(infile) -# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with -# traceback string if error occurs. -try: +try: +func = worker.read_command(pickle_ser, infile) +write_int(0, outfile) +outfile.flush() + +while True: +df_ref_id = utf8_deserializer.loads(infile) +batch_id = read_long(infile) +# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with +# traceback string if error occurs. process(df_ref_id, int(batch_id)) write_int(0, outfile) -except BaseException as e: -handle_worker_exception(e, outfile) +outfile.flush() +except BaseException as e: Review Comment: I think it would be better to use `Exception` because BaseException would only cover built-in exceptions. But we should probably handle all exceptions here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
grundprinzip commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576472509 ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -62,10 +62,7 @@ def main(infile: IO, outfile: IO) -> None: assert spark_connect_session.session_id == session_id spark = spark_connect_session -func = worker.read_command(pickle_ser, infile) -write_int(0, outfile) # Indicate successful initialization - -outfile.flush() +# TODO(SPARK-44461): Enable Process Isolation Review Comment: rm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
grundprinzip commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1576471728 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw new PythonException(s"Streaming Runner initialization failed" + +s" (returned $resFromPython). " + +s"Error message: $errMessage", null) +} Review Comment: can you add a proper error class for this? Or does the Python exception already have one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1575192686 ## core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala: ## @@ -91,6 +91,11 @@ private[spark] class StreamingPythonRunner( new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize)) val resFromPython = dataIn.readInt() +if (resFromPython != 0) { + val errMessage = PythonWorkerUtils.readUTF(dataIn) + throw new RuntimeException(s"Runner initialization failed (returned $resFromPython). " + Review Comment: Should be a PythonException? (comparing it with similar code in `StreamingForeachBatchHelper.scala`). Also, add 'Streaming' at the start of the message. ## python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py: ## @@ -76,16 +71,20 @@ def process(df_id, batch_id): # type: ignore[no-untyped-def] func(batch_df, batch_id) print(f"{log_name} Completed batch {batch_id} with DF id {df_id}") -while True: -df_ref_id = utf8_deserializer.loads(infile) -batch_id = read_long(infile) -# Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with -# traceback string if error occurs. -try: +try: +func = worker.read_command(pickle_ser, infile) +write_int(0, outfile) +outfile.flush() +while True: Review Comment: minor: leave an empty line before this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]
rangadi commented on code in PR #46125: URL: https://github.com/apache/spark/pull/46125#discussion_r1575186174 ## python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py: ## @@ -66,6 +66,27 @@ def func(df, _): q = df.writeStream.foreachBatch(func).start() q.processAllAvailable() +def test_pickling_deserialization_error(self): +class NoUnpickle: + +def __reduce__(self): +# Serialize only the data attribute +return self.throw_exception(), () + +def throw_exception(self): +raise RuntimeError("Cannot unpickle instance of NoUnpickle") + +no_unpickle = NoUnpickle() + +def func(df, _): +print(no_unpickle) +df.count() + +with self.assertRaises(Exception, msg="Cannot unpickle instance of NoUnpickle"): Review Comment: For reference, could you include the exception in this PR thread before and after the fix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org