[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863186#comment-16863186 ] Li Jin commented on SPARK-28006: Hi [~viirya] good questions: >> Can we use pandas agg udfs as window function? pandas agg udfs as window function is supported. With both unbounded and bounded window. >> Because the proposed GROUPED_XFORM udf calculates output values for all rows >> in the group, looks like the proposed GROUPED_XFORM udf can only use window >> frame (UnboundedPreceding, UnboundedFollowing) This is correct. It is really using unbounded window as groups here (because there is no groupby transform API in Spark sql). > User-defined grouped transform pandas_udf for window operations > --- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Li Jin >Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > Currently, in order to do this, user needs to use "grouped apply", for > example: > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def subtract_mean(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() > return pdf > df.groupby('id').apply(subtract_mean) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > This approach has a few downside: > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf('double', GROUPED_XFORM) > def subtract_mean(v): > return v - v.mean() > w = Window.partitionBy('id') > df = df.withColumn('v', subtract_mean(df['v']).over(w)) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863180#comment-16863180 ] Liang-Chi Hsieh commented on SPARK-28006: - I'm curious about two questions: Can we use pandas agg udfs as window function? Because the proposed GROUPED_XFORM udf calculates output values for all rows in the group, looks like the proposed GROUPED_XFORM udf can only use window frame (UnboundedPreceding, UnboundedFollowing)? > User-defined grouped transform pandas_udf for window operations > --- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Li Jin >Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > Currently, in order to do this, user needs to use "grouped apply", for > example: > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def subtract_mean(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() > return pdf > df.groupby('id').apply(subtract_mean) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > This approach has a few downside: > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf('double', GROUPED_XFORM) > def subtract_mean(v): > return v - v.mean() > w = Window.partitionBy('id') > df = df.withColumn('v', subtract_mean(df['v']).over(w)) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16862505#comment-16862505 ] Li Jin commented on SPARK-28006: Thanks [~hyukjin.kwon] for the comments! I updated the description to include input/outputs. Yeah I don't think currently the Scala window function has such type. The analogy of this in pandas is groupby transform, (hence the name grouped transform udf) {code:java} >>> df = pd.DataFrame({'id': [1, 1, 2, 2, 2], 'value': [1., 2., 3., 5., 10.]}) >>> df id value 0 1 1.0 1 1 2.0 2 2 3.0 3 2 5.0 4 2 10.0 >>> df['value_demean'] = df.groupby('id')['value'].transform(lambda x: x - >>> x.mean()) >>> df id value value_demean 0 1 1.0 -0.5 1 1 2.0 0.5 2 2 3.0 -3.0 3 2 5.0 -1.0 4 2 10.0 4.0 {code} > User-defined grouped transform pandas_udf for window operations > --- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Li Jin >Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > Currently, in order to do this, user needs to use "grouped apply", for > example: > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def subtract_mean(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() > return pdf > df.groupby('id').apply(subtract_mean) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > This approach has a few downside: > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > df = spark.createDataFrame( > [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], > ("id", "v")) > @pandas_udf('double', GROUPED_XFORM) > def subtract_mean(v): > return v - v.mean() / v.std() > w = Window.partitionBy('id') > df = df.withColumn('v', subtract_mean(df['v']).over(w)) > # +---++ > # | id| v| > # +---++ > # | 1|-0.5| > # | 1| 0.5| > # | 2|-3.0| > # | 2|-1.0| > # | 2| 4.0| > # +---++{code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861646#comment-16861646 ] Hyukjin Kwon commented on SPARK-28006: -- The proposal itself looks making sense to me from a cursory look. One concern is that though I don't think Spark has such type of Window function. cc [~hvanhovell] as well. I suspect the output is the same as our grouped map Pandas UDF if I understood correctly? It might be helpful to show the output so that non-Python guys could understand how it works as well :-). > User-defined grouped transform pandas_udf for window operations > --- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Li Jin >Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > > Currently, in order to do this, user needs to use "grouped apply", for > example: > > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def zscore(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() / v.std() > return pdf > df.groupby('id').apply(zscore){code} > This approach has a few downside: > > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > @pandas_udf('double', GROUPED_XFORM) > def zscore(v): > return v - v.mean() / v.std() > w = Window.partitionBy('id') > df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-28006) User-defined grouped transform pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861559#comment-16861559 ] Li Jin commented on SPARK-28006: cc [~hyukjin.kwon] [~LI,Xiao] [~ueshin] [~bryanc] I think code wise this is pretty simple but since this is adding a new pandas udf type I'd like to get some feedback on this. > User-defined grouped transform pandas_udf for window operations > --- > > Key: SPARK-28006 > URL: https://issues.apache.org/jira/browse/SPARK-28006 > Project: Spark > Issue Type: New Feature > Components: PySpark >Affects Versions: 2.4.3 >Reporter: Li Jin >Priority: Major > > Currently, pandas_udf supports "grouped aggregate" type that can be used with > unbounded and unbounded windows. There is another set of use cases that can > benefit from a "grouped transform" type pandas_udf. > Grouped transform is defined as a N -> N mapping over a group. For example, > "compute zscore for values in the group using the grouped mean and grouped > stdev", or "rank the values in the group". > > Currently, in order to do this, user needs to use "grouped apply", for > example: > > {code:java} > @pandas_udf(schema, GROUPED_MAP) > def zscore(pdf) > v = pdf['v'] > pdf['v'] = v - v.mean() / v.std() > return pdf > df.groupby('id').apply(zscore){code} > This approach has a few downside: > > * Specifying the full return schema is complicated for the user although the > function only changes one column. > * The column name 'v' inside as part of the udf, makes the udf less reusable. > * The entire dataframe is serialized to pass to Python although only one > column is needed. > Here we propose a new type of pandas_udf to work with these types of use > cases: > {code:java} > @pandas_udf('double', GROUPED_XFORM) > def zscore(v): > return v - v.mean() / v.std() > w = Window.partitionBy('id') > df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code} > Which addresses the above downsides. > * The user only needs to specify the output type of a single column. > * The column being zscored is decoupled from the udf implementation > * We only need to send one column to Python worker and concat the result > with the original dataframe (this is what grouped aggregate is doing already) > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org