[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511876#comment-16511876 ] Hyukjin Kwon commented on SPARK-22239: -- Sure, please go ahead. > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Assignee: Li Jin >Priority: Major > Fix For: 2.4.0 > > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16511495#comment-16511495 ] Li Jin commented on SPARK-22239: [~hyukjin.kwon] I actually don't think this Jira is done. The PR only resolves the unbounded window case, do you mind if I edit the Jira to reflect what's done and create a new Jira for rolling window case? > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Assignee: Li Jin >Priority: Major > Fix For: 2.4.0 > > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452977#comment-16452977 ] Li Jin commented on SPARK-22239: [~hvanhovell], I have done a bit further research of UDF over rolling windows and posted my results here: [https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit?usp=sharing] TL; DR I think we can implement efficiently by computing window indices in the JVM and pass the indices along with the window Python and do rolling over the indices in Python. I have not addressed the issue of splitting the window partition into smaller batches but I think it's doable as well. Would you be interested in taking a look and let me know what you think? > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16440094#comment-16440094 ] Apache Spark commented on SPARK-22239: -- User 'icexelloss' has created a pull request for this issue: https://github.com/apache/spark/pull/21082 > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411465#comment-16411465 ] Li Jin commented on SPARK-22239: Yeah unbounded windows are really just "groupby" in this case. I need to think more about bounded windows. I will send out some doc/ideas before implementing bounded windows. For now I will just do unbounded windows. > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16410521#comment-16410521 ] Herman van Hovell commented on SPARK-22239: --- Unbouded windows should be relatively straightforward. We buffer all the rows in a window before we apply the function window functions. We could turn that buffer into vectors and send it to python. There are two key issues here: * Very large windows, we shouldn't pass these to python in one go. * Nice python integration. We might need to create a python window frame processor. > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22239) User-defined window functions with pandas udf
[ https://issues.apache.org/jira/browse/SPARK-22239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408639#comment-16408639 ] Li Jin commented on SPARK-22239: I am looking into this. I will start with unbounded window first, i.e., Window.partitionBy(df.id). Growing/Shrinking/Moving windows are much more complicated because we don't want to send the each window to python worker. I will try to solve the simple case (unbounded window) first. > User-defined window functions with pandas udf > - > > Key: SPARK-22239 > URL: https://issues.apache.org/jira/browse/SPARK-22239 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.2.0 > Environment: >Reporter: Li Jin >Priority: Major > > Window function is another place we can benefit from vectored udf and add > another useful function to the pandas_udf suite. > Example usage (preliminary): > {code:java} > w = Window.partitionBy('id').orderBy('time').rangeBetween(-200, 0) > @pandas_udf(DoubleType()) > def ema(v1): > return v1.ewm(alpha=0.5).mean().iloc[-1] > df.withColumn('v1_ema', ema(df.v1).over(window)) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org