[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723552#comment-16723552 ] ASF GitHub Bot commented on SPARK-24561: asfgit closed pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index f98e550e39da8..d188de39e21c7 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2982,8 +2982,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 2|6.0| +---+---+ - This example shows using grouped aggregated UDFs as window functions. Note that only - unbounded window frame is supported at the moment: + This example shows using grouped aggregated UDFs as window functions. >>> from pyspark.sql.functions import pandas_udf, PandasUDFType >>> from pyspark.sql import Window @@ -2993,20 +2992,24 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window \\ - ... .partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = (Window.partitionBy('id') + ....orderBy('v') + ....rowsBetween(-1, 0)) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---++--+ | id| v|mean_v| +---++--+ - | 1| 1.0| 1.5| + | 1| 1.0| 1.0| | 1| 2.0| 1.5| - | 2| 3.0| 6.0| - | 2| 5.0| 6.0| - | 2|10.0| 6.0| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| +---++--+ + .. note:: For performance reasons, the input series to window functions are not copied. +Therefore, mutating the input series is not allowed and will cause incorrect results. +For the same reason, users should also not rely on the index of the input series. + .. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window` .. note:: The user-defined functions are considered deterministic by default. Due to diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py b/python/pyspark/sql/tests/test_pandas_udf_window.py index f0e6d2696df62..1b7df6797e9e6 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_window.py +++ b/python/pyspark/sql/tests/test_pandas_udf_window.py @@ -47,6 +47,15 @@ def pandas_scalar_time_two(self): from pyspark.sql.functions import pandas_udf return pandas_udf(lambda v: v * 2, 'double') +@property +def pandas_agg_count_udf(self): +from pyspark.sql.functions import pandas_udf, PandasUDFType + +@pandas_udf('long', PandasUDFType.GROUPED_AGG) +def count(v): +return len(v) +return count + @property def pandas_agg_mean_udf(self): from pyspark.sql.functions import pandas_udf, PandasUDFType @@ -77,7 +86,7 @@ def min(v): @property def unbounded_window(self): return Window.partitionBy('id') \ -.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) +.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing).orderBy('v') @property def ordered_window(self): @@ -87,6 +96,32 @@ def ordered_window(self): def unpartitioned_window(self): return Window.partitionBy() +@property +def sliding_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1) + +@property +def sliding_range_window(self): +return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4) + +@property +def growing_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3) + +@property +def growing_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(Window.unboundedPreceding, 4) + +@property +def shrinking_row_window(self): +return Window.partitionBy('id').orderBy('v').rowsBetween(-2, Window.unboundedFollowing) + +@property +def shrinking_range_window(self): +return Window.partitionBy('id').orderBy('v') \ +.rangeBetween(-3, Window.unboundedFollowing) + def test_simple(self): from
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718246#comment-16718246 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446412545 **[Test build #99989 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718250#comment-16718250 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446412883 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718253#comment-16718253 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240841612 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala ## @@ -113,7 +113,7 @@ private[window] object AggregateProcessor { * This class manages the processing of a number of aggregate functions. See the documentation of * the object for more information. */ -private[window] final class AggregateProcessor( +private[sql] final class AggregateProcessor( Review comment: Yea, see also some JIRAs like SPARK-16964. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718251#comment-16718251 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446412887 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99989/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718248#comment-16718248 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446412883 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718249#comment-16718249 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446412887 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99989/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718247#comment-16718247 ] ASF GitHub Bot commented on SPARK-24561: SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348650 **[Test build #99989 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717940#comment-16717940 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348650 **[Test build #99989 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717942#comment-16717942 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348628 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5987/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717941#comment-16717941 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348617 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717913#comment-16717913 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446345248 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99983/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717938#comment-16717938 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348617 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717939#comment-16717939 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446348628 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5987/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717922#comment-16717922 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446346819 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717911#comment-16717911 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446345241 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717908#comment-16717908 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446345241 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717909#comment-16717909 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446345248 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99983/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717907#comment-16717907 ] ASF GitHub Bot commented on SPARK-24561: SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446266832 **[Test build #99983 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717906#comment-16717906 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446344713 **[Test build #99983 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). * This patch **fails SparkR unit tests**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717502#comment-16717502 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446269160 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5982/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717501#comment-16717501 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446269145 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717490#comment-16717490 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240686604 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717489#comment-16717489 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240686572 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717487#comment-16717487 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240685803 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717497#comment-16717497 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240688367 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -73,68 +138,141 @@ case class WindowInPandasExec( } /** - * Create the resulting projection. - * - * This method uses Code Generation. It can only be used on the executor side. - * - * @param expressions unbound ordered function expressions. - * @return the final resulting projection. + * See [[WindowBoundHelpers]] for details. */ - private[this] def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { -val references = expressions.zipWithIndex.map { case (e, i) => - // Results of window expressions will be on the right side of child's output - BoundReference(child.output.size + i, e.dataType, e.nullable) + private def computeWindowBoundHelpers( + factories: Seq[InternalRow => WindowFunctionFrame] + ): WindowBoundHelpers = { +val dummyRow = new SpecificInternalRow() Review comment: Ha neat trick. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717500#comment-16717500 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240688489 ## File path: python/pyspark/sql/functions.py ## @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window \\ - ... .partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = (Window.partitionBy('id') + ....orderBy('v') + ....rowsBetween(-1, 0)) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---++--+ | id| v|mean_v| +---++--+ - | 1| 1.0| 1.5| + | 1| 1.0| 1.0| | 1| 2.0| 1.5| - | 2| 3.0| 6.0| - | 2| 5.0| 6.0| - | 2|10.0| 6.0| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| +---++--+ + .. warning:: For performance reasons, the input series to window functions are not copied. Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717498#comment-16717498 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240688453 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -60,6 +105,26 @@ case class WindowInPandasExec( override def outputPartitioning: Partitioning = child.outputPartitioning + /** + * Helper functions and data structures for window bounds + * + * It contains: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Seq from frame index to its window bound type + */ + private type WindowBoundHelpers = (Int, Int => Int, Int => Int, Seq[WindowType]) + + /** + * Enum for window bound types. Used only inside this class. + */ + private sealed case class WindowType(value: String) + private object UnboundedWindow extends WindowType("unbounded") + private object BoundedWindow extends WindowType("bounded") + + private val window_bound_type_conf = "pandas_window_bound_types" Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717496#comment-16717496 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240687003 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some(timeZone)) + case (a, b) if a== b => Add(expr, boundOffset) +} +
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717495#comment-16717495 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240686860 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala ## @@ -24,14 +24,14 @@ import org.apache.spark.sql.catalyst.expressions.Projection /** * Function for comparing boundary values. */ -private[window] abstract class BoundOrdering { +private[sql] abstract class BoundOrdering { Review comment: Yeah good catch. Reverted. I guess I was accessing this in WindowInPandasExec before This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717492#comment-16717492 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240686671 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some(timeZone)) + case (a, b) if a== b => Add(expr, boundOffset) Review
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717491#comment-16717491 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240686637 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717488#comment-16717488 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446266832 **[Test build #99983 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)** for PR 22305 at commit [`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717486#comment-16717486 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240685718 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717452#comment-16717452 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240678517 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala ## @@ -113,7 +113,7 @@ private[window] object AggregateProcessor { * This class manages the processing of a number of aggregate functions. See the documentation of * the object for more information. */ -private[window] final class AggregateProcessor( +private[sql] final class AggregateProcessor( Review comment: Do you mean we can remove the modifier here altogether because the fact that the class lives under namespace "execution" implies it's private already? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717418#comment-16717418 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240667535 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) Review comment: Oh this relates to the statement "The udf only needs to be evaluated once". If the udf is not deterministic, even the window bounds are the same, we still need to evaluate it once per window This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717417#comment-16717417 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240667535 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) Review comment: Oh this relates two the statement "The udf only needs to be evaluated once". If the udf is not deterministic, even the window bounds are the same, we still need to evaluate it once per window This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717360#comment-16717360 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240654467 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { Review comment: @ueshin No worries! I closed #23279 and thank you @hvanhovell This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716957#comment-16716957 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446179456 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716961#comment-16716961 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446179463 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99962/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716959#comment-16716959 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446179456 Merged build finished. Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716958#comment-16716958 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446179463 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99962/ Test FAILed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716952#comment-16716952 ] ASF GitHub Bot commented on SPARK-24561: SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118921 **[Test build #99962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)** for PR 22305 at commit [`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716951#comment-16716951 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446178752 **[Test build #99962 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)** for PR 22305 at commit [`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716732#comment-16716732 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446143339 Looks good to me too. If you guys wouldn't mind, I will take another final look within few days. Don't block by me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716738#comment-16716738 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon edited a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446143339 Looks good to me too - mostly just nits. If you guys wouldn't mind, I will take another final look within few days. Don't block by me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716708#comment-16716708 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240538539 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is Review comment: nit: `the in` -> `in the` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716713#comment-16716713 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240538889 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: + * (1) It doesn't have bound indices as input + * (2) The udf only needs to be evaluated once the in python worker (because the udf is + * deterministic and window bounds are the same for all windows) Review comment: Sorry if I missed something but what does exactly mean "the udf is deterministic" in this context? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716710#comment-16716710 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240538676 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window + * so it's bound indices will always be the same. + * + * Unbounded window also have a different eval type, because: Review comment: nit: `have` -> `has`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716691#comment-16716691 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240537517 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). + * + * This class groups window expressions by their window boundaries so that window expressions + * with the same window boundaries can share the same window bounds. The window bounds are + * prepended to the data passed to the python worker. + * + * For example, if we have: + * avg(v) over specifiedwindowframe(RowFrame, -5, 5), + * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing), + * avg(v) over specifiedwindowframe(RowFrame, -3, 3), + * max(v) over specifiedwindowframe(RowFrame, -3, 3) + * + * The python input will look like: + * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v) + * + * where w1 is specifiedwindowframe(RowFrame, -5, 5) + * w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, UnboundedFollowing) + * w3 is specifiedwindowframe(RowFrame, -3, 3) + * + * Note that w2 doesn't have bound indices in the python input because its unbounded window Review comment: `its` -> `it is` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716663#comment-16716663 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240533847 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some(timeZone)) + case (a, b) if a== b => Add(expr, boundOffset) +} +
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716688#comment-16716688 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240536682 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window + * aggregation. + * + * It currently materializes all data associated with the same partition key and passes them to + * Python worker. This is not strictly necessary for sliding windows and can be improved (by + * possibly slicing data into overlapping chunks and stitch them together). Review comment: nit `stitch` -> `stitching` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716667#comment-16716667 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240534340 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some(timeZone)) + case (a, b) if a== b => Add(expr, boundOffset) Review
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716686#comment-16716686 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240536250 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window Review comment: nit `python work` -> `Python worker` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716685#comment-16716685 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240536250 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -27,17 +27,62 @@ import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan} import org.apache.spark.sql.execution.arrow.ArrowUtils -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.execution.window._ +import org.apache.spark.sql.types._ import org.apache.spark.util.Utils +/** + * This class calculates and outputs windowed aggregates over the rows in a single partition. + * + * This is similar to [[WindowExec]]. The main difference is that this node doesn't not compute + * any window aggregation values. Instead, it computes the lower and upper bound for each window + * (i.e. window bounds) and pass the data and indices to python work to do the actual window Review comment: nit `python work` -> `python worker` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716670#comment-16716670 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240534842 ## File path: python/pyspark/sql/functions.py ## @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window \\ - ... .partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = (Window.partitionBy('id') + ....orderBy('v') + ....rowsBetween(-1, 0)) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---++--+ | id| v|mean_v| +---++--+ - | 1| 1.0| 1.5| + | 1| 1.0| 1.0| | 1| 2.0| 1.5| - | 2| 3.0| 6.0| - | 2| 5.0| 6.0| - | 2|10.0| 6.0| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| +---++--+ + .. warning:: For performance reasons, the input series to window functions are not copied. Review comment: Let's keep the indentation same with other `.. note::` as well .. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716665#comment-16716665 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240534200 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala ## @@ -24,14 +24,14 @@ import org.apache.spark.sql.catalyst.expressions.Projection /** * Function for comparing boundary values. */ -private[window] abstract class BoundOrdering { +private[sql] abstract class BoundOrdering { Review comment: BTW, why is this `[sql]`? Looks it can be `[window]`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716668#comment-16716668 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240534340 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.window + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.{CalendarIntervalType, DateType, IntegerType, TimestampType} + +private[sql] abstract class WindowExecBase( +windowExpression: Seq[NamedExpression], +partitionSpec: Seq[Expression], +orderSpec: Seq[SortOrder], +child: SparkPlan) extends UnaryExecNode { + + /** + * Create the resulting projection. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param expressions unbound ordered function expressions. + * @return the final resulting projection. + */ + protected def createResultProjection(expressions: Seq[Expression]): UnsafeProjection = { +val references = expressions.zipWithIndex.map { case (e, i) => + // Results of window expressions will be on the right side of child's output + BoundReference(child.output.size + i, e.dataType, e.nullable) +} +val unboundToRefMap = expressions.zip(references).toMap +val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap)) +UnsafeProjection.create( + child.output ++ patchedWindowExpression, + child.output) + } + + /** + * Create a bound ordering object for a given frame type and offset. A bound ordering object is + * used to determine which input row lies within the frame boundaries of an output row. + * + * This method uses Code Generation. It can only be used on the executor side. + * + * @param frame to evaluate. This can either be a Row or Range frame. + * @param bound with respect to the row. + * @param timeZone the session local timezone for time related calculations. + * @return a bound ordering object. + */ + protected def createBoundOrdering( + frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = { +(frame, bound) match { + case (RowFrame, CurrentRow) => +RowBoundOrdering(0) + + case (RowFrame, IntegerLiteral(offset)) => +RowBoundOrdering(offset) + + case (RangeFrame, CurrentRow) => +val ordering = newOrdering(orderSpec, child.output) +RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection) + + case (RangeFrame, offset: Expression) if orderSpec.size == 1 => +// Use only the first order expression when the offset is non-null. +val sortExpr = orderSpec.head +val expr = sortExpr.child + +// Create the projection which returns the current 'value'. +val current = newMutableProjection(expr :: Nil, child.output) + +// Flip the sign of the offset when processing the order is descending +val boundOffset = sortExpr.direction match { + case Descending => UnaryMinus(offset) + case Ascending => offset +} + +// Create the projection which returns the current 'value' modified by adding the offset. +val boundExpr = (expr.dataType, boundOffset.dataType) match { + case (DateType, IntegerType) => DateAdd(expr, boundOffset) + case (TimestampType, CalendarIntervalType) => +TimeAdd(expr, boundOffset, Some(timeZone)) + case (a, b) if a== b => Add(expr, boundOffset) Review
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716649#comment-16716649 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240529454 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala ## @@ -113,7 +113,7 @@ private[window] object AggregateProcessor { * This class manages the processing of a number of aggregate functions. See the documentation of * the object for more information. */ -private[window] final class AggregateProcessor( +private[sql] final class AggregateProcessor( Review comment: nit: `private[sql]` can be remove because `execution` is already a private package. For other places as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716606#comment-16716606 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240523454 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -60,6 +105,26 @@ case class WindowInPandasExec( override def outputPartitioning: Partitioning = child.outputPartitioning + /** + * Helper functions and data structures for window bounds + * + * It contains: + * (1) Total number of window bound indices in the python input row + * (2) Function from frame index to its lower bound column index in the python input row + * (3) Function from frame index to its upper bound column index in the python input row + * (4) Seq from frame index to its window bound type + */ + private type WindowBoundHelpers = (Int, Int => Int, Int => Int, Seq[WindowType]) + + /** + * Enum for window bound types. Used only inside this class. + */ + private sealed case class WindowType(value: String) + private object UnboundedWindow extends WindowType("unbounded") + private object BoundedWindow extends WindowType("bounded") + + private val window_bound_type_conf = "pandas_window_bound_types" Review comment: nit: `window_bound_type_conf` -> `windowBoundTypeConf` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716598#comment-16716598 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240522289 ## File path: python/pyspark/sql/functions.py ## @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, functionType=None): >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP ... def mean_udf(v): ... return v.mean() - >>> w = Window \\ - ... .partitionBy('id') \\ - ... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) + >>> w = (Window.partitionBy('id') + ....orderBy('v') + ....rowsBetween(-1, 0)) >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP +---++--+ | id| v|mean_v| +---++--+ - | 1| 1.0| 1.5| + | 1| 1.0| 1.0| | 1| 2.0| 1.5| - | 2| 3.0| 6.0| - | 2| 5.0| 6.0| - | 2|10.0| 6.0| + | 2| 3.0| 3.0| + | 2| 5.0| 4.0| + | 2|10.0| 7.5| +---++--+ + .. warning:: For performance reasons, the input series to window functions are not copied. Review comment: @icexelloss, it's a nit but let's leave this as a note (just for consistency) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716578#comment-16716578 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118973 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5964/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716577#comment-16716577 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118969 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716574#comment-16716574 ] ASF GitHub Bot commented on SPARK-24561: SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118921 **[Test build #99962 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)** for PR 22305 at commit [`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716576#comment-16716576 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118973 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5964/ Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716575#comment-16716575 ] ASF GitHub Bot commented on SPARK-24561: AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446118969 Merged build finished. Test PASSed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716570#comment-16716570 ] ASF GitHub Bot commented on SPARK-24561: HyukjinKwon commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446117830 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716107#comment-16716107 ] ASF GitHub Bot commented on SPARK-24561: ueshin edited a comment on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446062061 LGTM. @felixcheung Could you help us review please? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716096#comment-16716096 ] ASF GitHub Bot commented on SPARK-24561: ueshin commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240464740 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { Review comment: Oh, I see. yeah, it should work. I missed that they are `UnsafeRow`s. Thanks! @icexelloss Could you close #23279 and leave it as is? I am sorry for taking your time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716098#comment-16716098 ] ASF GitHub Bot commented on SPARK-24561: ueshin commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-446062061 LGTM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715848#comment-16715848 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240434370 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { Review comment: @hvanhovell Thanks for chiming in. I am not very familiar with unsafe row comparison and was just following @ueshin suggestion. If this is not needed. I can close #23279 and leave it as is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715701#comment-16715701 ] ASF GitHub Bot commented on SPARK-24561: hvanhovell commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240409746 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { Review comment: Why is this needed? You are comparing unsafe rows, so equality should work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715337#comment-16715337 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on a change in pull request #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#discussion_r240331939 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala ## @@ -144,24 +282,107 @@ case class WindowInPandasExec( queue.close() } - val inputProj = UnsafeProjection.create(allInputs, child.output) - val pythonInput = grouped.map { case (_, rows) => -rows.map { row => - queue.add(row.asInstanceOf[UnsafeRow]) - inputProj(row) + val stream = iter.map { row => +queue.add(row.asInstanceOf[UnsafeRow]) +row + } + + val pythonInput = new Iterator[Iterator[UnsafeRow]] { + +// Manage the stream and the grouping. +var nextRow: UnsafeRow = null +var nextGroup: UnsafeRow = null +var nextRowAvailable: Boolean = false +private[this] def fetchNextRow() { + nextRowAvailable = stream.hasNext + if (nextRowAvailable) { +nextRow = stream.next().asInstanceOf[UnsafeRow] +nextGroup = grouping(nextRow) + } else { +nextRow = null +nextGroup = null + } +} +fetchNextRow() + +// Manage the current partition. +val buffer: ExternalAppendOnlyUnsafeRowArray = + new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold) +var bufferIterator: Iterator[UnsafeRow] = _ + +val indexRow = new SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType)) + +val frames = factories.map(_(indexRow)) + +private[this] def fetchNextPartition() { + // Collect all the rows in the current partition. + // Before we start to fetch new input rows, make a copy of nextGroup. + val currentGroup = nextGroup.copy() + + // clear last partition + buffer.clear() + + while (nextRowAvailable && nextGroup == currentGroup) { Review comment: Sure. Here https://github.com/apache/spark/pull/23279 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715340#comment-16715340 ] ASF GitHub Bot commented on SPARK-24561: icexelloss commented on issue #22305: [SPARK-24561][SQL][Python] User-defined window aggregation functions with Pandas UDF (bounded window) URL: https://github.com/apache/spark/pull/22305#issuecomment-445925430 @ueshin Do you have any more comments other than https://github.com/apache/spark/pull/22305#discussion_r239312302? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598885#comment-16598885 ] Apache Spark commented on SPARK-24561: -- User 'icexelloss' has created a pull request for this issue: https://github.com/apache/spark/pull/22305 > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)
[ https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579965#comment-16579965 ] Li Jin commented on SPARK-24561: I am looking into this. Early investigation: https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit > User-defined window functions with pandas udf (bounded window) > -- > > Key: SPARK-24561 > URL: https://issues.apache.org/jira/browse/SPARK-24561 > Project: Spark > Issue Type: Sub-task > Components: PySpark >Affects Versions: 2.3.1 >Reporter: Li Jin >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org