zhengruifeng commented on code in PR #53035:
URL: https://github.com/apache/spark/pull/53035#discussion_r2525626806
##########
python/pyspark/sql/pandas/functions.py:
##########
@@ -301,6 +303,66 @@ def calculate(iterator: Iterator[pa.Array]) ->
Iterator[pa.Array]:
Therefore, mutating the input arrays is not allowed and will cause
incorrect results.
For the same reason, users should also not rely on the index of
the input arrays.
+ * Iterator of Arrays to Scalar
+ `Iterator[pyarrow.Array]` -> `Any`
+
+ The function takes an iterator of `pyarrow.Array` and returns a scalar
value. This is
+ useful for grouped aggregations where the UDF can process all batches
for a group
+ iteratively, which is more memory-efficient than loading all data at
once. The returned
+ scalar can be a python primitive type, a numpy data type, or a
`pyarrow.Scalar` instance.
+
+ >>> import pandas as pd
+ >>> from typing import Iterator
+ >>> @arrow_udf("double")
+ ... def arrow_mean(it: Iterator[pa.Array]) -> float:
+ ... sum_val = 0.0
+ ... cnt = 0
+ ... for v in it:
+ ... assert isinstance(v, pa.Array)
+ ... sum_val += pa.compute.sum(v).as_py()
+ ... cnt += len(v)
+ ... return sum_val / cnt
+ ...
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id",
"v"))
+ >>> df.groupby("id").agg(arrow_mean(df['v'])).show() # doctest: +SKIP
+ +---+---------------+
+ | id|arrow_mean(v) |
+ +---+---------------+
+ | 1| 1.5|
+ | 2| 6.0|
+ +---+---------------+
+
+ * Iterator of Multiple Arrays to Scalar
+ `Iterator[Tuple[pyarrow.Array, ...]]` -> `Any`
+
+ The function takes an iterator of a tuple of multiple `pyarrow.Array`
and returns a
+ scalar value. This is useful for grouped aggregations with multiple
input columns.
+
+ >>> from typing import Iterator, Tuple
+ >>> import numpy as np
+ >>> @arrow_udf("double")
+ ... def arrow_weighted_mean(it: Iterator[Tuple[pa.Array, pa.Array]])
-> float:
+ ... weighted_sum = 0.0
+ ... weight = 0.0
+ ... for v, w in it:
+ ... assert isinstance(v, pa.Array)
+ ... assert isinstance(w, pa.Array)
+ ... weighted_sum += np.dot(v, w)
+ ... weight += pa.compute.sum(w).as_py()
+ ... return weighted_sum / weight
+ ...
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0, 1.0), (1, 2.0, 2.0), (2, 3.0, 1.0), (2, 5.0, 2.0),
(2, 10.0, 3.0)],
+ ... ("id", "v", "w"))
+ >>> df.groupby("id").agg(arrow_weighted_mean(df["v"], df["w"])).show()
# doctest: +SKIP
+ +---+---------------------------------+
+ | id|arrow_weighted_mean(v, w) |
+ +---+---------------------------------+
+ | 1| 1.6666666666666667|
+ | 2| 7.166666666666667|
Review Comment:
I suggest don't compare the exact values since they may vary due to
env/version changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]