[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-05-18 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Labels:   (was: SPIP)

> SPIP: Support Dataframe Cogroup via Pandas UDFs 
> 
>
> Key: SPARK-27463
> URL: https://issues.apache.org/jira/browse/SPARK-27463
> Project: Spark
>  Issue Type: Improvement
>  Components: PySpark, SQL
>Affects Versions: 3.0.0
>Reporter: Chris Martin
>Priority: Major
>
> Recent work on Pandas UDFs in Spark, has allowed for improved 
> interoperability between Pandas and Spark.  This proposal aims to extend this 
> by introducing a new Pandas UDF type which would allow for a cogroup 
> operation to be applied to two PySpark DataFrames.
> Full details are in the google document linked below.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-15 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Description: 
Recent work on Pandas UDFs in Spark, has allowed for improved interoperability 
between Pandas and Spark.  This proposal aims to extend this by introducing a 
new Pandas UDF type which would allow for a cogroup operation to be applied to 
two PySpark DataFrames.

Full details are in the google document linked below.

 

  was:
h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 
co

[jira] [Updated] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

2019-04-15 Thread Chris Martin (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Martin updated SPARK-27463:
-
Description: 
h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 
cogroup in Scala
 * New logical node to be added representing cogroup.
 * New physical node to be added to implement cogroup. This node will ensure 
correct partitioning of input DataFrames and create two groupedIterators which 
will be piped into the Python process for UDF execution.
 * Extend ArrowPythonRunner such that it can