Yicong-Huang commented on code in PR #53035:
URL: https://github.com/apache/spark/pull/53035#discussion_r2558070258


##########
python/pyspark/sql/pandas/functions.py:
##########
@@ -301,6 +303,66 @@ def calculate(iterator: Iterator[pa.Array]) -> 
Iterator[pa.Array]:
             Therefore, mutating the input arrays is not allowed and will cause 
incorrect results.
             For the same reason, users should also not rely on the index of 
the input arrays.
 
+    * Iterator of Arrays to Scalar
+        `Iterator[pyarrow.Array]` -> `Any`
+
+        The function takes an iterator of `pyarrow.Array` and returns a scalar 
value. This is
+        useful for grouped aggregations where the UDF can process all batches 
for a group
+        iteratively, which is more memory-efficient than loading all data at 
once. The returned
+        scalar can be a python primitive type, a numpy data type, or a 
`pyarrow.Scalar` instance.
+
+        >>> import pandas as pd

Review Comment:
   removed



##########
python/pyspark/sql/pandas/functions.py:
##########
@@ -301,6 +303,66 @@ def calculate(iterator: Iterator[pa.Array]) -> 
Iterator[pa.Array]:
             Therefore, mutating the input arrays is not allowed and will cause 
incorrect results.
             For the same reason, users should also not rely on the index of 
the input arrays.
 
+    * Iterator of Arrays to Scalar
+        `Iterator[pyarrow.Array]` -> `Any`
+
+        The function takes an iterator of `pyarrow.Array` and returns a scalar 
value. This is
+        useful for grouped aggregations where the UDF can process all batches 
for a group
+        iteratively, which is more memory-efficient than loading all data at 
once. The returned
+        scalar can be a python primitive type, a numpy data type, or a 
`pyarrow.Scalar` instance.
+
+        >>> import pandas as pd
+        >>> from typing import Iterator
+        >>> @arrow_udf("double")
+        ... def arrow_mean(it: Iterator[pa.Array]) -> float:
+        ...     sum_val = 0.0
+        ...     cnt = 0
+        ...     for v in it:
+        ...         assert isinstance(v, pa.Array)
+        ...         sum_val += pa.compute.sum(v).as_py()
+        ...         cnt += len(v)
+        ...     return sum_val / cnt
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> df.groupby("id").agg(arrow_mean(df['v'])).show()  # doctest: +SKIP

Review Comment:
   removed skips



##########
python/pyspark/sql/pandas/functions.py:
##########
@@ -301,6 +303,66 @@ def calculate(iterator: Iterator[pa.Array]) -> 
Iterator[pa.Array]:
             Therefore, mutating the input arrays is not allowed and will cause 
incorrect results.
             For the same reason, users should also not rely on the index of 
the input arrays.
 
+    * Iterator of Arrays to Scalar
+        `Iterator[pyarrow.Array]` -> `Any`
+
+        The function takes an iterator of `pyarrow.Array` and returns a scalar 
value. This is
+        useful for grouped aggregations where the UDF can process all batches 
for a group
+        iteratively, which is more memory-efficient than loading all data at 
once. The returned
+        scalar can be a python primitive type, a numpy data type, or a 
`pyarrow.Scalar` instance.
+
+        >>> import pandas as pd
+        >>> from typing import Iterator
+        >>> @arrow_udf("double")
+        ... def arrow_mean(it: Iterator[pa.Array]) -> float:
+        ...     sum_val = 0.0
+        ...     cnt = 0
+        ...     for v in it:
+        ...         assert isinstance(v, pa.Array)
+        ...         sum_val += pa.compute.sum(v).as_py()
+        ...         cnt += len(v)
+        ...     return sum_val / cnt
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", 
"v"))
+        >>> df.groupby("id").agg(arrow_mean(df['v'])).show()  # doctest: +SKIP
+        +---+---------------+
+        | id|arrow_mean(v)  |
+        +---+---------------+
+        |  1|            1.5|
+        |  2|            6.0|
+        +---+---------------+
+
+    * Iterator of Multiple Arrays to Scalar
+        `Iterator[Tuple[pyarrow.Array, ...]]` -> `Any`
+
+        The function takes an iterator of a tuple of multiple `pyarrow.Array` 
and returns a
+        scalar value. This is useful for grouped aggregations with multiple 
input columns.
+
+        >>> from typing import Iterator, Tuple
+        >>> import numpy as np
+        >>> @arrow_udf("double")
+        ... def arrow_weighted_mean(it: Iterator[Tuple[pa.Array, pa.Array]]) 
-> float:
+        ...     weighted_sum = 0.0
+        ...     weight = 0.0
+        ...     for v, w in it:
+        ...         assert isinstance(v, pa.Array)
+        ...         assert isinstance(w, pa.Array)
+        ...         weighted_sum += np.dot(v, w)
+        ...         weight += pa.compute.sum(w).as_py()
+        ...     return weighted_sum / weight
+        ...
+        >>> df = spark.createDataFrame(
+        ...     [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0), 
(2, 10.0, 3.0)],
+        ...     ("id", "v", "w"))
+        >>> df.groupby("id").agg(arrow_weighted_mean(df["v"], df["w"])).show() 
 # doctest: +SKIP

Review Comment:
   removed skips



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to