Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19872#discussion_r162372467
  
    --- Diff: python/pyspark/worker.py ---
    @@ -110,6 +110,17 @@ def wrapped(*series):
         return wrapped
     
     
    +def wrap_pandas_group_agg_udf(f, return_type):
    +    arrow_return_type = to_arrow_type(return_type)
    +
    +    def wrapped(*series):
    +        import pandas as pd
    +        result = f(*series)
    +        return pd.Series(result)
    --- End diff --
    
    @HyukjinKwon is right. I am not sure it's worth it performance wise to have 
another ser/de because the overhead is proportional of the number of groups 
instead of number of rows. And it seems pretty fast too.
    
    ```
    %%time
    stream = io.BytesIO()
    
    for i in range(0, 1000):
        batch = _create_batch(pd.Series(i), None)
        writer = pa.RecordBatchStreamWriter(stream, batch.schema)
        writer.write_batch(batch)
        writer.close()
    
    CPU times: user 266 ms, sys: 12.7 ms, total: 279 ms
    Wall time: 281 ms
    ```
    This is overhead of <1ms per group
     


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to