viirya commented on code in PR #55552:
URL: https://github.com/apache/spark/pull/55552#discussion_r3378562157


##########
core/src/main/scala/org/apache/spark/internal/config/Python.scala:
##########
@@ -150,4 +150,28 @@ private[spark] object Python {
       .version("4.1.0")
       .booleanConf
       .createWithDefault(true)
+
+  val PYTHON_UDF_PIPELINED_EXECUTION =
+    ConfigBuilder("spark.python.udf.pipelined.enabled")
+      .doc("When true, enables pipelined (asynchronous) data transfer between 
JVM and Python " +
+        "UDF workers. In pipelined mode, input serialization runs in a 
separate writer thread " +
+        "while the main task thread reads output from the Python worker, 
allowing the two " +
+        "directions to overlap for improved throughput. " +
+        "This is particularly beneficial for compute-heavy UDFs (e.g., ML 
inference).")

Review Comment:
   Re-worded in 8688369e47b:
   
   > This can improve throughput for some workloads (e.g., multi-column UDFs or 
compute-heavy UDFs like ML inference); for light, single-column UDFs the 
overhead of the extra thread may offset the gain.



##########
python/pyspark/worker.py:
##########
@@ -3588,12 +3588,93 @@ def process():
                 if hasattr(out_iter, "close"):
                     out_iter.close()
 
+        def pipelined_process():
+            """
+            Pipelined variant of process() that pre-fetches input batches in a 
background
+            reader thread while the main thread computes the UDF and writes 
output.
+            This allows input deserialization to overlap with UDF computation.
+            """
+            # Mark that pipelined mode is active so UDFs can verify the code 
path.
+            os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
+            import queue
+            import threading
+
+            queue_depth = 
int(os.environ.get("SPARK_PIPELINED_UDF_QUEUE_DEPTH", "2"))
+            _SENTINEL = object()
+            input_queue = queue.Queue(maxsize=queue_depth)
+            reader_error = [None]
+            stop_event = threading.Event()
+
+            def _reader_thread():
+                try:
+                    for batch in deserializer.load_stream(infile):
+                        # Some serializers (e.g., ArrowStreamGroupSerializer,
+                        # ArrowStreamAggPandasUDFSerializer) yield lazy 
iterators
+                        # that still read from infile. Materialize them here 
so the
+                        # main thread can consume them without touching infile.
+                        if hasattr(batch, "__next__"):
+                            batch = list(batch)

Review Comment:
   Added `WideRowUDFTimeBench` in f76f1445c74. It carries a 1 KB or 4 KB string 
payload per row and bumps `spark.sql.execution.arrow.maxRecordsPerBatch` so 
each Arrow batch is ~10 MB or ~20 MB (vs ~80 KB in the existing `LongType` 
benchmarks).
   
   Local results (3 iterations after warmup):
   
   | shape | sync | pipelined | speedup |
   |---|---|---|---|
   | 50k rows × 1 KB payload, 10k records/batch (~10 MB/batch) | 75 ms | 65 ms 
| 1.15x |
   | 50k rows × 4 KB payload, 5k records/batch (~20 MB/batch) | 153 ms | 149 ms 
| 1.03x |
   
   Driver-side peak RSS is unchanged in both modes. The speedup narrows on the 
larger-batch case, consistent with the regime where queue-buffered batches 
become non-negligible -- but no regression.



##########
python/pyspark/worker.py:
##########
@@ -3609,12 +3588,93 @@ def process():
                 if hasattr(out_iter, "close"):
                     out_iter.close()
 
+        def pipelined_process():
+            """
+            Pipelined variant of process() that pre-fetches input batches in a 
background
+            reader thread while the main thread computes the UDF and writes 
output.
+            This allows input deserialization to overlap with UDF computation.
+            """
+            # Mark that pipelined mode is active so UDFs can verify the code 
path.
+            os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
+            import queue
+            import threading
+
+            queue_depth = 
int(os.environ.get("SPARK_PIPELINED_UDF_QUEUE_DEPTH", "2"))
+            _SENTINEL = object()
+            input_queue = queue.Queue(maxsize=queue_depth)
+            reader_error = [None]
+            stop_event = threading.Event()
+
+            def _reader_thread():
+                try:
+                    for batch in deserializer.load_stream(infile):
+                        # Some serializers (e.g., ArrowStreamGroupSerializer,
+                        # ArrowStreamAggPandasUDFSerializer) yield lazy 
iterators
+                        # that still read from infile. Materialize them here 
so the
+                        # main thread can consume them without touching infile.
+                        if hasattr(batch, "__next__"):
+                            batch = list(batch)
+                        # Use timeout put so we can check stop_event 
periodically.
+                        # This prevents the reader from blocking forever if 
the main
+                        # thread stops consuming (e.g., due to UDF exception).
+                        while not stop_event.is_set():
+                            try:
+                                input_queue.put(batch, timeout=1)
+                                break
+                            except queue.Full:
+                                continue
+                        if stop_event.is_set():
+                            return

Review Comment:
   Thanks -- agreed the Condition + bounded-buffer approach would avoid the 
0.1s polling under sustained back-pressure. Happy to leave it as a follow-up so 
this PR does not grow further, and I will be glad to review your patch when you 
have one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to