Yicong-Huang opened a new pull request, #53387:
URL: https://github.com/apache/spark/pull/53387

   ### What changes were proposed in this pull request?
   
   Optimize Arrow serializers (`ArrowStreamPandasSerializer`, 
`GroupPandasUDFSerializer`, and `ArrowStreamAggPandasUDFSerializer`) by 
avoiding unnecessary `pa.Table` creation when processing single `RecordBatch` 
instances. 
   
   The optimization replaces `pa.Table.from_batches([batch]).itercolumns()` 
with direct column access using `batch.column(i)` for single batches. This 
eliminates unnecessary Table and iterator object creation, reducing function 
call overhead and GC pressure.
   
   **Changes:**
   - `ArrowStreamPandasSerializer.load_stream()`: Direct column access instead 
of creating Table wrapper
   - `GroupPandasUDFSerializer.load_stream()`: Direct column access for each 
batch
   - `ArrowStreamAggPandasUDFSerializer.load_stream()`: Optimized single batch 
case to avoid Table creation
   
   **Code example:**
   
   ```python
   # Before (ArrowStreamPandasSerializer.load_stream)
   for batch in batches:
       pandas_batches = [
           self.arrow_to_pandas(c, i)
           for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
       ]
   
   # After
   for batch in batches:
       pandas_batches = [
           self.arrow_to_pandas(batch.column(i), i)
           for i in range(batch.num_columns)
       ]
   ```
   
   ### Why are the changes needed?
   
   Several serializers in `pyspark.sql.pandas.serializers` unnecessarily create 
`pa.Table` objects when processing single `RecordBatch` instances. When 
converting Arrow RecordBatches to pandas Series, the code creates a `pa.Table` 
wrapper for each batch just to iterate over columns, which introduces:
   - Unnecessary object creation (Table objects and iterators)
   - Extra function call overhead
   - Increased GC pressure
   
   For a workload processing 1000 batches with 10 columns each, this avoids 
creating 2000 temporary objects (1000 Table objects + 1000 iterators). 
`RecordBatch.column(i)` directly returns the column array reference 
(zero-copy), reducing function call overhead.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is a performance optimization that maintains backward 
compatibility. The serialization behavior remains the same, only the internal 
implementation is optimized.
   
   ### How was this patch tested?
   
   Existing tests pass without modification. The optimization maintains 
functional correctness while improving performance.
   
   **Benchmark results** (1000 batches, 1000 rows, 10 columns, 100 iterations):
   
   | Serializer | Metric | Master | Optimized | Improvement |
   |------------|--------|--------|-----------|-------------|
   | **ArrowStreamPandasSerializer** | Average | 435.23ms | 436.72ms | +0.3% |
   | | P95 | 470.70ms | 479.17ms | +1.8% |
   | | P99 | 516.75ms | 500.23ms | **-3.2%** |
   | | Std Dev | 22.07ms | 20.95ms | **-5.1%** |
   | | Throughput | 2,297.6 batches/sec | 2,289.8 batches/sec | -0.3% |
   | **GroupPandasUDFSerializer** | Average | 199.10ms | 195.38ms | **-1.9%** |
   | | P95 | 212.47ms | 208.69ms | **-1.8%** |
   | | P99 | 246.94ms | 211.40ms | **-14.4%** |
   | | Std Dev | 7.69ms | 5.61ms | **-27.0%** |
   | | Throughput | 2,511.3 batches/sec | 2,559.1 batches/sec | **+1.9%** |
   | **ArrowStreamAggPandasUDFSerializer** | Average | 229.69ms | 226.14ms | 
**-1.5%** |
   | | Median | 228.54ms | 221.88ms | **-2.9%** |
   | | P95 | 249.66ms | 255.75ms | +2.4% |
   | | P99 | 268.33ms | 279.89ms | +4.3% |
   | | Std Dev | 10.44ms | 12.94ms | +24.0% |
   | | Throughput | 2,176.8 groups/sec | 2,211.0 groups/sec | **+1.6%** |
   
   The optimization shows significant improvements, especially for 
`GroupPandasUDFSerializer` with 14.4% P99 improvement and 27% reduction in 
standard deviation, indicating better stability and reduced performance 
variance.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to