santosh-d3vpl3x opened a new pull request, #39902:
URL: https://github.com/apache/spark/pull/39902
Pandas cogroup UDF with applyInPandas currently support two dataframes. This
is already very useful but limits us both API wise and efficiency wise when we
have to use multiple DFs with cogroup.applyInPandas. The PR here is to support
multiple DFs in cogroup.
```python
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102,
2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
df3 = spark.createDataFrame(
[(20000101, 1, "asd"), (20000101, 2, "d")],
("time", "id", "v3"))
df4 = spark.createDataFrame(
[(20000101, 1, "v"), (20000101, 2, "g")],
("time", "id", "v4"))
def asof_join(df1, df2, df3, df4):
df12 = pd.merge_asof(df1, df2, on="time", by="id")
df123 = pd.merge_asof(df12, df3, on="time", by="id")
df1234 = pd.merge_asof(df123, df4, on="time", by="id")
return df1234
df1.groupby("id").cogroup(df2.groupby("id"), df3.groupby("id"),
df4.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string, v3 string, v4
string"
).show()
+--------+---+---+---+---+---+
| time| id| v1| v2| v3| v4|
+--------+---+---+---+---+---+
|20000101| 1|1.0| x|asd| v|
|20000102| 1|3.0| x|asd| v|
|20000101| 2|2.0| y| d| g|
|20000102| 2|4.0| y| d| g|
+--------+---+---+---+---+---+
```
Note: The previous experimental implementation of the API expects either 2
or 3 args. If 2 args are passed then UDF receives 2 cogrouped DFs, if 3 args
are passed then UDF receives 2 cogrouped DFs with grouping key as the first
arg. The previous API is limiting and has implications on new proposed change.
There is no clear way of distinguishing whether 3 args are for 3 DFs or 2 DFs +
1 grouping key. I have kept previous implementation and codepath intact but I
would like to get rid of it. I would like to make `pass_key` as a way of
indicating whether grouping key should be passed as first arg to UDF or not.
Question: Can I remove previous implementation with this new idea?

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]