Github user icexelloss commented on a diff in the pull request:
https://github.com/apache/spark/pull/19872#discussion_r162372467
--- Diff: python/pyspark/worker.py ---
@@ -110,6 +110,17 @@ def wrapped(*series):
return wrapped
+def wrap_pandas_group_agg_udf(f, return_type):
+ arrow_return_type = to_arrow_type(return_type)
+
+ def wrapped(*series):
+ import pandas as pd
+ result = f(*series)
+ return pd.Series(result)
--- End diff --
@HyukjinKwon is right. I am not sure it's worth it performance wise to have
another ser/de because the overhead is proportional of the number of groups
instead of number of rows. And it seems pretty fast too.
```
%%time
stream = io.BytesIO()
for i in range(0, 1000):
batch = _create_batch(pd.Series(i), None)
writer = pa.RecordBatchStreamWriter(stream, batch.schema)
writer.write_batch(batch)
writer.close()
CPU times: user 266 ms, sys: 12.7 ms, total: 279 ms
Wall time: 281 ms
```
This is overhead of <1ms per group
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]