Chris Martin created SPARK-27463:
------------------------------------

             Summary: SPIP: Support Dataframe Cogroup via Pandas UDFs 
                 Key: SPARK-27463
                 URL: https://issues.apache.org/jira/browse/SPARK-27463
             Project: Spark
          Issue Type: Improvement
          Components: PySpark, SQL
    Affects Versions: 3.0.0
            Reporter: Chris Martin


h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve 
interoperability with the Pandas library. This work has allowed users to write 
User Defined Functions (UDFs) in Pandas which can then be applied to a Spark 
DataFrame. The benefit here is that it allows users to combine the 
functionality of Pandas with the parallelisation abilities of Spark. In 
addition, these new Pandas UDFs have significantly lower overhead than 
traditional UDFS as they operate on a batch of data at a time (i.e. they are 
vectorised) and they use Apache Arrow for serialisation between the JVM and 
Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively 
offer a map operation at the row level, while Grouped Map UDFs allow a map 
operation on a group of data. This functionality has proved successful in 
allowing users to integrate Spark with existing Pandas workflows, however there 
are situations where the existing functionality offered is not sufficient. One 
such case is analogous to the existing Cogroup functionality available on RDDs 
and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In 
this case, the user would like to group two Spark DataFrames by a common key 
and then apply a python function to each group. This python function would take 
two pandas DataFrames as its arguments and would return an arbitrary length 
Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider 
the use case of performing an as-of join between two distinct DataFrames This 
is something that has traditionally been very difficult to do in Spark (and 
indeed in SQL in general)[2] but which has good support in Pandas[3]. If 
Cogroup-like functionality was available in PySpark then one could simply write 
a Pandas function to perform the as-of joining which could then be applied to 
two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow 
for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3ccagy9duxt569bpgp0wsc2esjgcoo5+hbfihfbkofcocclmjh...@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims 
to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API 
itself there are a couple of options depending on whether the goal is syntactic 
brevity or with consistency with the DataSet version of cogroup. If brevity is 
the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to 
apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new 
method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be 
updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify 
this new type of UDF. Functions annotated with this decorator should take two 
Pandas DataFrames and return a single Pandas DataFrame. Here is an example of 
usage, using the as-of join use case described earlier and the first option for 
the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two 
DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent 
R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose 
cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow 
cogroup in Scala
 * New logical node to be added representing cogroup.
 * New physical node to be added to implement cogroup. This node will ensure 
correct partitioning of input DataFrames and create two groupedIterators which 
will be piped into the Python process for UDF execution.
 * Extend ArrowPythonRunner such that it can serialise two separate DataFrames 
for input into the Python process.
 * Extend worker.py such that it can accept the two serialised DataFrames 
produced by the Scala process and then pass them to the UDF function.

Inherent in the design is that both the Java Executor and the Python Worker 
must have enough memory available to hold the largest group. Furthermore, no 
partial aggregation will be possible which implies that a shuffle must be 
performed to colocate the data. Note that both these properties are shared with 
the existing groupby().apply() functionality, and the latter with the existing 
DataSet cogroup.

The aim is to leverage as much of the current Spark functionality as possible. 
Data locality can be handled by the existing Spark functionality while 
serialisation between the JVM and Python processes will use the existing Arrow 
mechanism. The main complexity here will lie in the extension of the 
serialisation mechanism to allow two separate DataFrames to be passed from the 
JVM to the python process. At present this looks as if this is tied to the 
concept of serialising rows from a single Spark DataFrame into a single Arrow 
Table and then into a Single Pandas DataFrame. In order to extend this, the 
code on both the JVM and Python sides must be refactored.
h2. *Risks*

The majority of the work required is low risk as it requires addition of new 
functionality which is relatively isolated from any existing code. The 
exception, however, is the extension of the serialisation code as this could 
affect the operation of existing udfs (both Pandas and non-Pandas).
h2. 
*Timescales*

Most of the changes are fairly straightforward, however the complexity around 
the serialisation refactoring does lead to some uncertainty here. With that in 
mind I would estimate 4-8 weeks of work for the implementation and another 2-3 
weeks for testing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to