[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Martin updated SPARK-27463: - Labels: (was: SPIP) > SPIP: Support Dataframe Cogroup via Pandas UDFs > > > Key: SPARK-27463 > URL: https://issues.apache.org/jira/browse/SPARK-27463 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 3.0.0 >Reporter: Chris Martin >Priority: Major > > Recent work on Pandas UDFs in Spark, has allowed for improved > interoperability between Pandas and Spark. This proposal aims to extend this > by introducing a new Pandas UDF type which would allow for a cogroup > operation to be applied to two PySpark DataFrames. > Full details are in the google document linked below. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Martin updated SPARK-27463: - Description: Recent work on Pandas UDFs in Spark, has allowed for improved interoperability between Pandas and Spark. This proposal aims to extend this by introducing a new Pandas UDF type which would allow for a cogroup operation to be applied to two PySpark DataFrames. Full details are in the google document linked below. was: h2. *Background and Motivation* Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes. As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame. To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames. This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF. [1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e] [2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark. [3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html] h2. *API Changes* The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class: {code:java} # other is another DataFrame, on is the cogroup key, udf is the function to apply. def cogroup(self, other, on, udf){code} Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class. {code:java} # other is another GroupedData, udf is the function to apply. def cogroup(self, other, udf){code} The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached. In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax. {code:java} @pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP) # df1, df2 and function return are all pandas.DataFrames def asof_join(df1, df2): return pd.merge_asof(df1, df2, on='time') df1.cogroup(df2, on='product_id', apply=asof_join){code} h2. *Target Personas* Data scientists, data engineers, library developers. h2. *Scope* * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered. h2. *Design* * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark * New package private method to be added to RelationGroupedDataset to allow co
[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs
[ https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Martin updated SPARK-27463: - Description: h2. *Background and Motivation* Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes. As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame. To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames. This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF. [1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e] [2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark. [3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html] h2. *API Changes* The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class: {code:java} # other is another DataFrame, on is the cogroup key, udf is the function to apply. def cogroup(self, other, on, udf){code} Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class. {code:java} # other is another GroupedData, udf is the function to apply. def cogroup(self, other, udf){code} The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached. In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax. {code:java} @pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP) # df1, df2 and function return are all pandas.DataFrames def asof_join(df1, df2): return pd.merge_asof(df1, df2, on='time') df1.cogroup(df2, on='product_id', apply=asof_join){code} h2. *Target Personas* Data scientists, data engineers, library developers. h2. *Scope* * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered. h2. *Design* * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark * New package private method to be added to RelationGroupedDataset to allow cogroup in Scala * New logical node to be added representing cogroup. * New physical node to be added to implement cogroup. This node will ensure correct partitioning of input DataFrames and create two groupedIterators which will be piped into the Python process for UDF execution. * Extend ArrowPythonRunner such that it can