[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-13 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863186#comment-16863186
 ] 

Li Jin commented on SPARK-28006:


Hi [~viirya] good questions:

>> Can we use pandas agg udfs as window function?

pandas agg udfs as window function is supported. With both unbounded and 
bounded window.

>> Because the proposed GROUPED_XFORM udf calculates output values for all rows 
>> in the group, looks like the proposed GROUPED_XFORM udf can only use window 
>> frame (UnboundedPreceding, UnboundedFollowing)

This is correct. It is really using unbounded window as groups here (because 
there is no groupby transform API in Spark sql).

> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean()
> return pdf
> df.groupby('id').apply(subtract_mean)
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> df = spark.createDataFrame(
> [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
> ("id", "v"))
> @pandas_udf('double', GROUPED_XFORM)
> def subtract_mean(v):
> return v - v.mean()
> w = Window.partitionBy('id')
> df = df.withColumn('v', subtract_mean(df['v']).over(w))
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863180#comment-16863180
 ] 

Liang-Chi Hsieh commented on SPARK-28006:
-

I'm curious about two questions:

Can we use pandas agg udfs as window function?

Because the proposed GROUPED_XFORM udf calculates output values for all rows in 
the group, looks like the proposed GROUPED_XFORM udf can only use window frame 
(UnboundedPreceding, UnboundedFollowing)?

> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean()
> return pdf
> df.groupby('id').apply(subtract_mean)
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> df = spark.createDataFrame(
> [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
> ("id", "v"))
> @pandas_udf('double', GROUPED_XFORM)
> def subtract_mean(v):
> return v - v.mean()
> w = Window.partitionBy('id')
> df = df.withColumn('v', subtract_mean(df['v']).over(w))
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-12 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862505#comment-16862505
 ] 

Li Jin commented on SPARK-28006:


Thanks [~hyukjin.kwon] for the comments! I updated the description to include 
input/outputs.

Yeah I don't think currently the Scala window function has such type. The 
analogy of this in pandas is groupby transform, (hence the name grouped 
transform udf)

 
{code:java}
>>> df = pd.DataFrame({'id': [1, 1, 2, 2, 2], 'value': [1., 2., 3., 5., 10.]})

>>> df
   id  value

0   1    1.0
1   1    2.0
2   2    3.0
3   2    5.0
4   2   10.0

>>> df['value_demean'] = df.groupby('id')['value'].transform(lambda x: x - 
>>> x.mean())
>>> df

   id  value  value_demean
0   1    1.0          -0.5
1   1    2.0           0.5
2   2    3.0          -3.0
3   2    5.0          -1.0
4   2   10.0           4.0
{code}
 

> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean()
> return pdf
> df.groupby('id').apply(subtract_mean)
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> df = spark.createDataFrame(
> [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
> ("id", "v"))
> @pandas_udf('double', GROUPED_XFORM)
> def subtract_mean(v):
> return v - v.mean() / v.std()
> w = Window.partitionBy('id')
> df = df.withColumn('v', subtract_mean(df['v']).over(w))
> # +---++
> # | id|   v|
> # +---++
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---++{code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-11 Thread Hyukjin Kwon (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861646#comment-16861646
 ] 

Hyukjin Kwon commented on SPARK-28006:
--

The proposal itself looks making sense to me from a cursory look. One concern 
is that though I don't think Spark has such type of Window function. cc 
[~hvanhovell] as well.
I suspect the output is the same as our grouped map Pandas UDF if I understood 
correctly? It might be helpful to show the output so that non-Python guys could 
understand how it works as well :-).



> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
>  
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
>  
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def zscore(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean() / v.std()
> return pdf
> df.groupby('id').apply(zscore){code}
> This approach has a few downside:
>  
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> @pandas_udf('double', GROUPED_XFORM)
> def zscore(v):
> return v - v.mean() / v.std()
> w = Window.partitionBy('id')
> df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

2019-06-11 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861559#comment-16861559
 ] 

Li Jin commented on SPARK-28006:


cc [~hyukjin.kwon] [~LI,Xiao] [~ueshin] [~bryanc]

I think code wise this is pretty simple but since this is adding a new pandas 
udf type I'd like to get some feedback on this.

> User-defined grouped transform pandas_udf for window operations
> ---
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
>  Issue Type: New Feature
>  Components: PySpark
>Affects Versions: 2.4.3
>Reporter: Li Jin
>Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with 
> unbounded and unbounded windows. There is another set of use cases that can 
> benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, 
> "compute zscore for values in the group using the grouped mean and grouped 
> stdev", or "rank the values in the group".
>  
> Currently, in order to do this, user needs to use "grouped apply", for 
> example:
>  
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def zscore(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean() / v.std()
> return pdf
> df.groupby('id').apply(zscore){code}
> This approach has a few downside:
>  
>  * Specifying the full return schema is complicated for the user although the 
> function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one 
> column is needed.
> Here we propose a new type of pandas_udf to work with these types of use 
> cases:
> {code:java}
> @pandas_udf('double', GROUPED_XFORM)
> def zscore(v):
> return v - v.mean() / v.std()
> w = Window.partitionBy('id')
> df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result 
> with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org