Yicong-Huang opened a new pull request, #53317:
URL: https://github.com/apache/spark/pull/53317

   ### What changes were proposed in this pull request?
   
   This PR introduces an iterator API for pandas grouped aggregation UDFs, 
enabling batch-by-batch processing to improve memory efficiency. Users can now 
write UDFs that accept `Iterator[pd.Series]` or `Iterator[Tuple[pd.Series, 
...]]` and return a scalar value, allowing them to process large groups 
incrementally without loading all data into memory at once.
   
   This brings pandas UDFs to feature parity with Arrow UDFs, which already 
support the iterator API.
   
   ### Why are the changes needed?
   
   The iterator API provides better memory efficiency for grouped aggregation 
UDFs by allowing batch-by-batch processing instead of loading all data for a 
group into memory at once. This is especially beneficial for:
   - Large groups that don't fit in memory
   - Streaming or incremental processing scenarios
   - Memory-constrained environments
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. This introduces a new way to write pandas grouped aggregation UDFs 
using iterator-based type hints:
   
   **Before:**
   ```python
   @pandas_udf("double")
   def mean_udf(v: pd.Series) -> float:
       return v.mean()
   ```
   
   **After (new iterator API):**
   ```python
   @pandas_udf("double")
   def mean_iter(it: Iterator[pd.Series]) -> float:
       sum_val = 0.0
       cnt = 0
       for v in it:
           sum_val += v.sum()
           cnt += len(v)
       return sum_val / cnt
   ```
   
   The iterator API is automatically detected via type hints 
(`Iterator[pd.Series]` or `Iterator[Tuple[pd.Series, ...]]`), so existing code 
continues to work unchanged.
   
   ### How was this patch tested?
   
   - **Type Hint Tests**: Added tests in `test_pandas_udf_typehints.py` 
verifying correct inference of iterator-based UDFs
   - **Functional Tests**: Added tests in `test_pandas_udf_grouped_agg.py` 
covering:
     - Single column input (`Iterator[pd.Series]`)
     - Multiple column input (`Iterator[Tuple[pd.Series, pd.Series]]`)
     - Eval type verification
     - Partial consumption scenarios
   - **Documentation Tests**: All doctests pass, including new iterator API 
examples
   - **Integration**: Verified compatibility with existing grouped aggregation 
UDFs
   - All existing tests continue to pass
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to