Yicong-Huang opened a new pull request, #53387:
URL: https://github.com/apache/spark/pull/53387
### What changes were proposed in this pull request?
Optimize Arrow serializers (`ArrowStreamPandasSerializer`,
`GroupPandasUDFSerializer`, and `ArrowStreamAggPandasUDFSerializer`) by
avoiding unnecessary `pa.Table` creation when processing single `RecordBatch`
instances.
The optimization replaces `pa.Table.from_batches([batch]).itercolumns()`
with direct column access using `batch.column(i)` for single batches. This
eliminates unnecessary Table and iterator object creation, reducing function
call overhead and GC pressure.
**Changes:**
- `ArrowStreamPandasSerializer.load_stream()`: Direct column access instead
of creating Table wrapper
- `GroupPandasUDFSerializer.load_stream()`: Direct column access for each
batch
- `ArrowStreamAggPandasUDFSerializer.load_stream()`: Optimized single batch
case to avoid Table creation
**Code example:**
```python
# Before (ArrowStreamPandasSerializer.load_stream)
for batch in batches:
pandas_batches = [
self.arrow_to_pandas(c, i)
for i, c in enumerate(pa.Table.from_batches([batch]).itercolumns())
]
# After
for batch in batches:
pandas_batches = [
self.arrow_to_pandas(batch.column(i), i)
for i in range(batch.num_columns)
]
```
### Why are the changes needed?
Several serializers in `pyspark.sql.pandas.serializers` unnecessarily create
`pa.Table` objects when processing single `RecordBatch` instances. When
converting Arrow RecordBatches to pandas Series, the code creates a `pa.Table`
wrapper for each batch just to iterate over columns, which introduces:
- Unnecessary object creation (Table objects and iterators)
- Extra function call overhead
- Increased GC pressure
For a workload processing 1000 batches with 10 columns each, this avoids
creating 2000 temporary objects (1000 Table objects + 1000 iterators).
`RecordBatch.column(i)` directly returns the column array reference
(zero-copy), reducing function call overhead.
### Does this PR introduce _any_ user-facing change?
No. This is a performance optimization that maintains backward
compatibility. The serialization behavior remains the same, only the internal
implementation is optimized.
### How was this patch tested?
Existing tests pass without modification. The optimization maintains
functional correctness while improving performance.
**Benchmark results** (1000 batches, 1000 rows, 10 columns, 100 iterations):
| Serializer | Metric | Master | Optimized | Improvement |
|------------|--------|--------|-----------|-------------|
| **ArrowStreamPandasSerializer** | Average | 435.23ms | 436.72ms | +0.3% |
| | P95 | 470.70ms | 479.17ms | +1.8% |
| | P99 | 516.75ms | 500.23ms | **-3.2%** |
| | Std Dev | 22.07ms | 20.95ms | **-5.1%** |
| | Throughput | 2,297.6 batches/sec | 2,289.8 batches/sec | -0.3% |
| **GroupPandasUDFSerializer** | Average | 199.10ms | 195.38ms | **-1.9%** |
| | P95 | 212.47ms | 208.69ms | **-1.8%** |
| | P99 | 246.94ms | 211.40ms | **-14.4%** |
| | Std Dev | 7.69ms | 5.61ms | **-27.0%** |
| | Throughput | 2,511.3 batches/sec | 2,559.1 batches/sec | **+1.9%** |
| **ArrowStreamAggPandasUDFSerializer** | Average | 229.69ms | 226.14ms |
**-1.5%** |
| | Median | 228.54ms | 221.88ms | **-2.9%** |
| | P95 | 249.66ms | 255.75ms | +2.4% |
| | P99 | 268.33ms | 279.89ms | +4.3% |
| | Std Dev | 10.44ms | 12.94ms | +24.0% |
| | Throughput | 2,176.8 groups/sec | 2,211.0 groups/sec | **+1.6%** |
The optimization shows significant improvements, especially for
`GroupPandasUDFSerializer` with 14.4% P99 improvement and 27% reduction in
standard deviation, indicating better stability and reduced performance
variance.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]