santosh-d3vpl3x opened a new pull request, #39902:
URL: https://github.com/apache/spark/pull/39902

   Pandas cogroup UDF with applyInPandas currently support two dataframes. This 
is already very useful but limits us both API wise and efficiency wise when we 
have to use multiple DFs with cogroup.applyInPandas. The PR here is to support 
multiple DFs in cogroup.
   
   ```python
   df1 = spark.createDataFrame(
       [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 
2, 4.0)],
       ("time", "id", "v1"))
   df2 = spark.createDataFrame(
       [(20000101, 1, "x"), (20000101, 2, "y")],
       ("time", "id", "v2"))
   df3 = spark.createDataFrame(
       [(20000101, 1, "asd"), (20000101, 2, "d")],
       ("time", "id", "v3"))
   df4 = spark.createDataFrame(
       [(20000101, 1, "v"), (20000101, 2, "g")],
       ("time", "id", "v4"))
   def asof_join(df1, df2, df3, df4):
       df12 = pd.merge_asof(df1, df2, on="time", by="id")
       df123 = pd.merge_asof(df12, df3, on="time", by="id")
       df1234 = pd.merge_asof(df123, df4, on="time", by="id")
       return df1234
   df1.groupby("id").cogroup(df2.groupby("id"), df3.groupby("id"), 
df4.groupby("id")).applyInPandas(
       asof_join, schema="time int, id int, v1 double, v2 string, v3 string, v4 
string"
   ).show()
   
   +--------+---+---+---+---+---+
   |    time| id| v1| v2| v3| v4|
   +--------+---+---+---+---+---+
   |20000101|  1|1.0|  x|asd|  v|
   |20000102|  1|3.0|  x|asd|  v|
   |20000101|  2|2.0|  y|  d|  g|
   |20000102|  2|4.0|  y|  d|  g|
   +--------+---+---+---+---+---+
   ```
   
   Note: The previous experimental implementation of the API expects either 2 
or 3 args. If 2 args are passed then UDF receives 2 cogrouped DFs, if 3 args 
are passed then UDF receives 2 cogrouped DFs with grouping key as the first 
arg. The previous API is limiting and has implications on new proposed change. 
There is no clear way of distinguishing whether 3 args are for 3 DFs or 2 DFs + 
1 grouping key. I have kept previous implementation and codepath intact but I 
would like to get rid of it. I would like to make `pass_key` as a way of 
indicating whether grouping key should be passed as first arg to UDF or not. 
Question: Can I remove previous implementation with this new idea?
   
   
![image](https://user-images.githubusercontent.com/3813695/216947300-f1876572-d934-42b6-8003-8df0bde917bc.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to