Yicong-Huang commented on code in PR #56754:
URL: https://github.com/apache/spark/pull/56754#discussion_r3477296173
##########
python/pyspark/worker.py:
##########
@@ -3621,30 +3636,6 @@ def mapper(a):
return f(keys, vals, state)
- elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF:
- # We assume there is only one UDF here because grouped agg doesn't
- # support combining multiple UDFs.
- assert num_udfs == 1
-
- arg_offsets, f = udfs[0]
-
- # Convert to iterator of pandas Series:
- # - Iterator[pd.Series] for single column
- # - Iterator[Tuple[pd.Series, ...]] for multiple columns
- def mapper(batch_iter):
- # batch_iter is Iterator[Tuple[pd.Series, ...]] where each tuple
represents one batch
- # Convert to Iterator[pd.Series] or Iterator[Tuple[pd.Series,
...]] based on arg_offsets
- if len(arg_offsets) == 1:
- # Single column: Iterator[Tuple[pd.Series, ...]] ->
Iterator[pd.Series]
- series_iter = (batch_series[arg_offsets[0]] for batch_series
in batch_iter)
- else:
- # Multiple columns: Iterator[Tuple[pd.Series, ...]] ->
- # Iterator[Tuple[pd.Series, ...]]
- series_iter = (
- tuple(batch_series[o] for o in arg_offsets) for
batch_series in batch_iter
- )
- return f(series_iter)
-
else:
Review Comment:
ah thanks for the catch, yeah this is a behaviral change. let me separate it
out from this refactoring PR.
##########
python/pyspark/worker.py:
##########
@@ -3621,30 +3636,6 @@ def mapper(a):
return f(keys, vals, state)
- elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF:
- # We assume there is only one UDF here because grouped agg doesn't
- # support combining multiple UDFs.
- assert num_udfs == 1
-
- arg_offsets, f = udfs[0]
-
- # Convert to iterator of pandas Series:
- # - Iterator[pd.Series] for single column
- # - Iterator[Tuple[pd.Series, ...]] for multiple columns
- def mapper(batch_iter):
- # batch_iter is Iterator[Tuple[pd.Series, ...]] where each tuple
represents one batch
- # Convert to Iterator[pd.Series] or Iterator[Tuple[pd.Series,
...]] based on arg_offsets
- if len(arg_offsets) == 1:
- # Single column: Iterator[Tuple[pd.Series, ...]] ->
Iterator[pd.Series]
- series_iter = (batch_series[arg_offsets[0]] for batch_series
in batch_iter)
- else:
- # Multiple columns: Iterator[Tuple[pd.Series, ...]] ->
- # Iterator[Tuple[pd.Series, ...]]
- series_iter = (
- tuple(batch_series[o] for o in arg_offsets) for
batch_series in batch_iter
- )
- return f(series_iter)
-
else:
Review Comment:
ah thanks for the catch, yeah this is a behavioral change. let me separate
it out from this refactoring PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]