Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


HyukjinKwon commented on PR #46125:
URL: https://github.com/apache/spark/pull/46125#issuecomment-2073665401

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


HyukjinKwon closed pull request #46125: [SPARK-47941] [SS] [Connect] Propagate 
ForeachBatch worker initialization errors to users for PySpark
URL: https://github.com/apache/spark/pull/46125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


ericm-db commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576656803


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576629583


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):

Review Comment:
   minor: rename to '...RunnerInitializationFailure' or '...FailedToInitialize' 
... 



##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,11 +91,27 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw streamingPythonRunnerDidNotInitialize(resFromPython, errMessage)
+}
 logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
 (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerDidNotInitialize(resFromPython: Int, errMessage: 
String):
+StreamingPythonRunnerInitializationException = {
+new StreamingPythonRunnerInitializationException(resFromPython, errMessage)
+  }
+
+  class StreamingPythonRunnerInitializationException(resFromPython: Int, 
errMessage: String)
+extends SparkPythonException(
+  errorClass = "STREAMING_PYTHON_RUNNER_DID_NOT_INITIALIZE",

Review Comment:
   Similar rename here. This is what user will see. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


ericm-db commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576514370


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw new PythonException(s"Streaming Runner initialization failed" +
+s" (returned $resFromPython). " +
+s"Error message: $errMessage", null)
+}

Review Comment:
   Created a new error class, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


grundprinzip commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576480636


##
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##
@@ -76,16 +73,21 @@ def process(df_id, batch_id):  # type: 
ignore[no-untyped-def]
 func(batch_df, batch_id)
 print(f"{log_name} Completed batch {batch_id} with DF id {df_id}")
 
-while True:
-df_ref_id = utf8_deserializer.loads(infile)
-batch_id = read_long(infile)
-# Handle errors inside Python worker. Write 0 to outfile if no errors 
and write -2 with
-# traceback string if error occurs.
-try:
+try:
+func = worker.read_command(pickle_ser, infile)
+write_int(0, outfile)
+outfile.flush()
+
+while True:
+df_ref_id = utf8_deserializer.loads(infile)
+batch_id = read_long(infile)
+# Handle errors inside Python worker. Write 0 to outfile if no 
errors and write -2 with
+# traceback string if error occurs.
 process(df_ref_id, int(batch_id))
 write_int(0, outfile)
-except BaseException as e:
-handle_worker_exception(e, outfile)
+outfile.flush()
+except BaseException as e:

Review Comment:
   I think it would be better to use `Exception` because BaseException would 
only cover built-in exceptions. But we should probably handle all exceptions 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


grundprinzip commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576472509


##
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##
@@ -62,10 +62,7 @@ def main(infile: IO, outfile: IO) -> None:
 assert spark_connect_session.session_id == session_id
 spark = spark_connect_session
 
-func = worker.read_command(pickle_ser, infile)
-write_int(0, outfile)  # Indicate successful initialization
-
-outfile.flush()
+# TODO(SPARK-44461): Enable Process Isolation

Review Comment:
   rm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-23 Thread via GitHub


grundprinzip commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1576471728


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,6 +91,12 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw new PythonException(s"Streaming Runner initialization failed" +
+s" (returned $resFromPython). " +
+s"Error message: $errMessage", null)
+}

Review Comment:
   can you add a proper error class for this? Or does the Python exception 
already have one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-22 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1575192686


##
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##
@@ -91,6 +91,11 @@ private[spark] class StreamingPythonRunner(
   new 
BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, 
bufferSize))
 
 val resFromPython = dataIn.readInt()
+if (resFromPython != 0) {
+  val errMessage = PythonWorkerUtils.readUTF(dataIn)
+  throw new RuntimeException(s"Runner initialization failed (returned 
$resFromPython). " +

Review Comment:
   Should be a PythonException? (comparing it with similar code in 
`StreamingForeachBatchHelper.scala`).
   Also, add 'Streaming' at the start of the message. 



##
python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py:
##
@@ -76,16 +71,20 @@ def process(df_id, batch_id):  # type: 
ignore[no-untyped-def]
 func(batch_df, batch_id)
 print(f"{log_name} Completed batch {batch_id} with DF id {df_id}")
 
-while True:
-df_ref_id = utf8_deserializer.loads(infile)
-batch_id = read_long(infile)
-# Handle errors inside Python worker. Write 0 to outfile if no errors 
and write -2 with
-# traceback string if error occurs.
-try:
+try:
+func = worker.read_command(pickle_ser, infile)
+write_int(0, outfile)
+outfile.flush()
+while True:

Review Comment:
   minor: leave an empty line before this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47941] [SS] [Connect] Propagate ForeachBatch worker initialization errors to users for PySpark [spark]

2024-04-22 Thread via GitHub


rangadi commented on code in PR #46125:
URL: https://github.com/apache/spark/pull/46125#discussion_r1575186174


##
python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py:
##
@@ -66,6 +66,27 @@ def func(df, _):
 q = df.writeStream.foreachBatch(func).start()
 q.processAllAvailable()
 
+def test_pickling_deserialization_error(self):
+class NoUnpickle:
+
+def __reduce__(self):
+# Serialize only the data attribute
+return self.throw_exception(), ()
+
+def throw_exception(self):
+raise RuntimeError("Cannot unpickle instance of NoUnpickle")
+
+no_unpickle = NoUnpickle()
+
+def func(df, _):
+print(no_unpickle)
+df.count()
+
+with self.assertRaises(Exception, msg="Cannot unpickle instance of 
NoUnpickle"):

Review Comment:
   For reference, could you include the exception in this PR thread before and 
after the fix? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org