d80tb7 commented on issue #24981: [WIP][SPARK-27463][PYTHON] Support Dataframe 
Cogroup via Pandas UDFs- Arrow Stream Impl
URL: https://github.com/apache/spark/pull/24981#issuecomment-512893972
 
 
   Hi @icexelloss, @hjoo 
   
   to answer your questions about the API- yes the` df1.cogroup(df2, 
on='id').apply(func)` is more succinct and yes we could easily add a simple 
wrapper to make both forms available.  Personally I'd prefer not to add a 
wrapper because I belive that multiple apis to acomplish the same thing is a 
bit confusing, but if people want to adopt it as a comporomise I think I could 
be persuaded.
   
   As for the two api calls- we've been back and forth on this a few times.  
Just to clear up one point made earlier, the 
`df1.groupby('id').cogroup(df2.groupby('id')).apply(func)` form *does* allow 
you to group by different key columns (or expressions).  The ` df1.cogroup(df2, 
on='id').apply(func)` could conceivably allow you to do this but we would have 
to take some sort of expression with aliasing (like join) for this to be done.
   
   I'd really like to agree the final api before doing a final PR though, 
mainly because I don't think we should commit anything that consistitues a 
public api without it being agreed.  If nothing else I'd like to hear what 
@HyukjinKwon thinks as he expressed an interest in the API on the JIRA and 
probably has a good feel for how the pyspark APIs should look.
   
   Finally I've been on holiday this week so not much time for coding but I did 
run some benchmarks between the Arrow Stream and non-Arrow Stream versions of 
this code.  The driver program was as follows:
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import col, pandas_udf, PandasUDFType
   
   spark = SparkSession\
       .builder\
       .master('local[1]')\
       .config('spark.sql.shuffle.partitions', 1)\
       .getOrCreate()
   
   @pandas_udf('id long, t long, v double', PandasUDFType.COGROUPED_MAP)
   def only_left(l, _):
       return l
   
   group_sizes = [1, 10, 100, 1000, 10000, 100000]
   
   for group_size in group_sizes:
       df = spark.range(0, 100000) \
           .withColumn('t', col('id')) \
           .withColumn('v', col('id').cast('double')) \
           .withColumn('id', (col('id')/ group_size).cast('long'))
       print('Group Size: ' + str(group_size))
       df\
           .groupby('id')\
           .cogroup(df.groupby('id'))\
           .apply(only_left)\
           .write.mode('overwrite').parquet('output')
   ```
   In short, this will cogroup a 100k  row dataframe with itself using a single 
partition.  Results are as follows (All times in ms as reported by total 
section of ReaderIterator.HandleTimingData):
   
   | Group Size  | Time Arrow Stream | Time Non-Arrow Stream | 
   | -------------: | -------------: | -------------: |
   | 1| 261,718 |258,950|
   | 10|  25,491 |25208|
   | 100| 2,580  | 2,521 |
   | 1,000| 291  | 298 |
   | 10,000| 59 | 74 |
   | 100,000| 13 | 49 |
   
   I think this demonstrates that the the overhead of sending the schema is 
pretty minimal (although at some point I might look at where the overhead does 
come from as it's slower than I would expect and simple groupby.apply() appears 
to exhibit the same), so unless anyone says otherwise I'll assu,e that keeping 
with Arrow streams is the way we will go on this.
   
   Finally (and apologies for the length of this post) I'll continue with my 
plan of getting this into a mergable state. 
   
   Chris
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to