[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723552#comment-16723552
 ] 

ASF GitHub Bot commented on SPARK-24561:


asfgit closed pull request #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index f98e550e39da8..d188de39e21c7 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2982,8 +2982,7 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  2|6.0|
+---+---+
 
-   This example shows using grouped aggregated UDFs as window functions. 
Note that only
-   unbounded window frame is supported at the moment:
+   This example shows using grouped aggregated UDFs as window functions.
 
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
@@ -2993,20 +2992,24 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
-   >>> w = Window \\
-   ... .partitionBy('id') \\
-   ... .rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+   >>> w = (Window.partitionBy('id')
+   ....orderBy('v')
+   ....rowsBetween(-1, 0))
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
+---++--+
| id|   v|mean_v|
+---++--+
-   |  1| 1.0|   1.5|
+   |  1| 1.0|   1.0|
|  1| 2.0|   1.5|
-   |  2| 3.0|   6.0|
-   |  2| 5.0|   6.0|
-   |  2|10.0|   6.0|
+   |  2| 3.0|   3.0|
+   |  2| 5.0|   4.0|
+   |  2|10.0|   7.5|
+---++--+
 
+   .. note:: For performance reasons, the input series to window functions 
are not copied.
+Therefore, mutating the input series is not allowed and will cause 
incorrect results.
+For the same reason, users should also not rely on the index of 
the input series.
+
.. seealso:: :meth:`pyspark.sql.GroupedData.agg` and 
:class:`pyspark.sql.Window`
 
 .. note:: The user-defined functions are considered deterministic by 
default. Due to
diff --git a/python/pyspark/sql/tests/test_pandas_udf_window.py 
b/python/pyspark/sql/tests/test_pandas_udf_window.py
index f0e6d2696df62..1b7df6797e9e6 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_window.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_window.py
@@ -47,6 +47,15 @@ def pandas_scalar_time_two(self):
 from pyspark.sql.functions import pandas_udf
 return pandas_udf(lambda v: v * 2, 'double')
 
+@property
+def pandas_agg_count_udf(self):
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+
+@pandas_udf('long', PandasUDFType.GROUPED_AGG)
+def count(v):
+return len(v)
+return count
+
 @property
 def pandas_agg_mean_udf(self):
 from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -77,7 +86,7 @@ def min(v):
 @property
 def unbounded_window(self):
 return Window.partitionBy('id') \
-.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
+.rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing).orderBy('v')
 
 @property
 def ordered_window(self):
@@ -87,6 +96,32 @@ def ordered_window(self):
 def unpartitioned_window(self):
 return Window.partitionBy()
 
+@property
+def sliding_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 1)
+
+@property
+def sliding_range_window(self):
+return Window.partitionBy('id').orderBy('v').rangeBetween(-2, 4)
+
+@property
+def growing_row_window(self):
+return 
Window.partitionBy('id').orderBy('v').rowsBetween(Window.unboundedPreceding, 3)
+
+@property
+def growing_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(Window.unboundedPreceding, 4)
+
+@property
+def shrinking_row_window(self):
+return Window.partitionBy('id').orderBy('v').rowsBetween(-2, 
Window.unboundedFollowing)
+
+@property
+def shrinking_range_window(self):
+return Window.partitionBy('id').orderBy('v') \
+.rangeBetween(-3, Window.unboundedFollowing)
+
 def test_simple(self):
 from 

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718246#comment-16718246
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446412545
 
 
   **[Test build #99989 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718250#comment-16718250
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446412883
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718253#comment-16718253
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240841612
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 ##
 @@ -113,7 +113,7 @@ private[window] object AggregateProcessor {
  * This class manages the processing of a number of aggregate functions. See 
the documentation of
  * the object for more information.
  */
-private[window] final class AggregateProcessor(
+private[sql] final class AggregateProcessor(
 
 Review comment:
   Yea, see also some JIRAs like SPARK-16964.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718251#comment-16718251
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446412887
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99989/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718248#comment-16718248
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446412883
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718249#comment-16718249
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446412887
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99989/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16718247#comment-16718247
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348650
 
 
   **[Test build #99989 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717940#comment-16717940
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348650
 
 
   **[Test build #99989 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99989/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717942#comment-16717942
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348628
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5987/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717941#comment-16717941
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348617
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717913#comment-16717913
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446345248
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99983/
   Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717938#comment-16717938
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348617
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717939#comment-16717939
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446348628
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5987/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717922#comment-16717922
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446346819
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717911#comment-16717911
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446345241
 
 
   Merged build finished. Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717908#comment-16717908
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446345241
 
 
   Merged build finished. Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717909#comment-16717909
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446345248
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99983/
   Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717907#comment-16717907
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446266832
 
 
   **[Test build #99983 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717906#comment-16717906
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446344713
 
 
   **[Test build #99983 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).
* This patch **fails SparkR unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717502#comment-16717502
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446269160
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5982/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717501#comment-16717501
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446269145
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717490#comment-16717490
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686604
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717489#comment-16717489
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686572
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717487#comment-16717487
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240685803
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717497#comment-16717497
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240688367
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -73,68 +138,141 @@ case class WindowInPandasExec(
   }
 
   /**
-   * Create the resulting projection.
-   *
-   * This method uses Code Generation. It can only be used on the executor 
side.
-   *
-   * @param expressions unbound ordered function expressions.
-   * @return the final resulting projection.
+   * See [[WindowBoundHelpers]] for details.
*/
-  private[this] def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
-val references = expressions.zipWithIndex.map { case (e, i) =>
-  // Results of window expressions will be on the right side of child's 
output
-  BoundReference(child.output.size + i, e.dataType, e.nullable)
+  private def computeWindowBoundHelpers(
+  factories: Seq[InternalRow => WindowFunctionFrame]
+  ): WindowBoundHelpers = {
+val dummyRow = new SpecificInternalRow()
 
 Review comment:
Ha neat trick. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717500#comment-16717500
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240688489
 
 

 ##
 File path: python/pyspark/sql/functions.py
 ##
 @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
-   >>> w = Window \\
-   ... .partitionBy('id') \\
-   ... .rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+   >>> w = (Window.partitionBy('id')
+   ....orderBy('v')
+   ....rowsBetween(-1, 0))
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
+---++--+
| id|   v|mean_v|
+---++--+
-   |  1| 1.0|   1.5|
+   |  1| 1.0|   1.0|
|  1| 2.0|   1.5|
-   |  2| 3.0|   6.0|
-   |  2| 5.0|   6.0|
-   |  2|10.0|   6.0|
+   |  2| 3.0|   3.0|
+   |  2| 5.0|   4.0|
+   |  2|10.0|   7.5|
+---++--+
 
+   .. warning:: For performance reasons, the input series to window 
functions are not copied.
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717498#comment-16717498
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240688453
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -60,6 +105,26 @@ case class WindowInPandasExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  /**
+   * Helper functions and data structures for window bounds
+   *
+   * It contains:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Seq from frame index to its window bound type
+   */
+  private type WindowBoundHelpers = (Int, Int => Int, Int => Int, 
Seq[WindowType])
+
+  /**
+   * Enum for window bound types. Used only inside this class.
+   */
+  private sealed case class WindowType(value: String)
+  private object UnboundedWindow extends WindowType("unbounded")
+  private object BoundedWindow extends WindowType("bounded")
+
+  private val window_bound_type_conf = "pandas_window_bound_types"
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717496#comment-16717496
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240687003
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of child's 
output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' modified by 
adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some(timeZone))
+  case (a, b) if a== b => Add(expr, boundOffset)
+}
+   

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717495#comment-16717495
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686860
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala
 ##
 @@ -24,14 +24,14 @@ import org.apache.spark.sql.catalyst.expressions.Projection
 /**
  * Function for comparing boundary values.
  */
-private[window] abstract class BoundOrdering {
+private[sql] abstract class BoundOrdering {
 
 Review comment:
   Yeah good catch. Reverted. I guess I was accessing this in 
WindowInPandasExec before


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717492#comment-16717492
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686671
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of child's 
output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' modified by 
adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some(timeZone))
+  case (a, b) if a== b => Add(expr, boundOffset)
 
 Review 

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717491#comment-16717491
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240686637
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717488#comment-16717488
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446266832
 
 
   **[Test build #99983 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99983/testReport)**
 for PR 22305 at commit 
[`5d3bbd6`](https://github.com/apache/spark/commit/5d3bbd6bfbdd6b332c65f9748ab066d9eb6a480d).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717486#comment-16717486
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240685718
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
 
 Review comment:
   Fixed


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717452#comment-16717452
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240678517
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 ##
 @@ -113,7 +113,7 @@ private[window] object AggregateProcessor {
  * This class manages the processing of a number of aggregate functions. See 
the documentation of
  * the object for more information.
  */
-private[window] final class AggregateProcessor(
+private[sql] final class AggregateProcessor(
 
 Review comment:
   Do you mean we can remove the modifier here altogether because the fact that 
the class lives under namespace "execution" implies it's private already?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717418#comment-16717418
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240667535
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
+ * deterministic and window bounds are the same for all windows)
 
 Review comment:
   Oh this relates to the statement "The udf only needs to be evaluated once". 
If the udf is not deterministic, even the window bounds are the same, we still 
need to evaluate it once per window


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717417#comment-16717417
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240667535
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
+ * deterministic and window bounds are the same for all windows)
 
 Review comment:
   Oh this relates two the statement "The udf only needs to be evaluated once". 
If the udf is not deterministic, even the window bounds are the same, we still 
need to evaluate it once per window


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16717360#comment-16717360
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240654467
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   @ueshin No worries! I closed #23279 and thank you @hvanhovell 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716957#comment-16716957
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446179456
 
 
   Merged build finished. Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716961#comment-16716961
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446179463
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99962/
   Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716959#comment-16716959
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446179456
 
 
   Merged build finished. Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716958#comment-16716958
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446179463
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/99962/
   Test FAILed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716952#comment-16716952
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118921
 
 
   **[Test build #99962 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)**
 for PR 22305 at commit 
[`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716951#comment-16716951
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446178752
 
 
   **[Test build #99962 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)**
 for PR 22305 at commit 
[`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56).
* This patch **fails PySpark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716732#comment-16716732
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446143339
 
 
   Looks good to me too. If you guys wouldn't mind, I will take another final 
look within few days. Don't block by me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716738#comment-16716738
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon edited a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446143339
 
 
   Looks good to me too - mostly just nits. If you guys wouldn't mind, I will 
take another final look within few days. Don't block by me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716708#comment-16716708
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240538539
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
 
 Review comment:
   nit: `the in` -> `in the`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716713#comment-16716713
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240538889
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
+ * (1) It doesn't have bound indices as input
+ * (2) The udf only needs to be evaluated once the in python worker (because 
the udf is
+ * deterministic and window bounds are the same for all windows)
 
 Review comment:
   Sorry if I missed something but what does exactly mean "the udf is 
deterministic" in this context?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716710#comment-16716710
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240538676
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
+ * so it's bound indices will always be the same.
+ *
+ * Unbounded window also have a different eval type, because:
 
 Review comment:
   nit: `have` -> `has`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716691#comment-16716691
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240537517
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
+ *
+ * This class groups window expressions by their window boundaries so that 
window expressions
+ * with the same window boundaries can share the same window bounds. The 
window bounds are
+ * prepended to the data passed to the python worker.
+ *
+ * For example, if we have:
+ * avg(v) over specifiedwindowframe(RowFrame, -5, 5),
+ * avg(v) over specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing),
+ * avg(v) over specifiedwindowframe(RowFrame, -3, 3),
+ * max(v) over specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * The python input will look like:
+ * (lower_bound_w1, upper_bound_w1, lower_bound_w3, upper_bound_w3, v)
+ *
+ * where w1 is specifiedwindowframe(RowFrame, -5, 5)
+ *   w2 is specifiedwindowframe(RowFrame, UnboundedPreceding, 
UnboundedFollowing)
+ *   w3 is specifiedwindowframe(RowFrame, -3, 3)
+ *
+ * Note that w2 doesn't have bound indices in the python input because its 
unbounded window
 
 Review comment:
   `its` -> `it is`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716663#comment-16716663
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240533847
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of child's 
output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' modified by 
adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some(timeZone))
+  case (a, b) if a== b => Add(expr, boundOffset)
+}
+  

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716688#comment-16716688
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240536682
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
+ * aggregation.
+ *
+ * It currently materializes all data associated with the same partition key 
and passes them to
+ * Python worker. This is not strictly necessary for sliding windows and can 
be improved (by
+ * possibly slicing data into overlapping chunks and stitch them together).
 
 Review comment:
   nit `stitch` -> `stitching`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716667#comment-16716667
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240534340
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of child's 
output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' modified by 
adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some(timeZone))
+  case (a, b) if a== b => Add(expr, boundOffset)
 
 Review 

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716686#comment-16716686
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240536250
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
 
 Review comment:
   nit `python work` -> `Python worker`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716685#comment-16716685
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240536250
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -27,17 +27,62 @@ import 
org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, 
SparkPlan}
 import org.apache.spark.sql.execution.arrow.ArrowUtils
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.execution.window._
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+/**
+ * This class calculates and outputs windowed aggregates over the rows in a 
single partition.
+ *
+ * This is similar to [[WindowExec]]. The main difference is that this node 
doesn't not compute
+ * any window aggregation values. Instead, it computes the lower and upper 
bound for each window
+ * (i.e. window bounds) and pass the data and indices to python work to do the 
actual window
 
 Review comment:
   nit `python work` -> `python worker`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716670#comment-16716670
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240534842
 
 

 ##
 File path: python/pyspark/sql/functions.py
 ##
 @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
-   >>> w = Window \\
-   ... .partitionBy('id') \\
-   ... .rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+   >>> w = (Window.partitionBy('id')
+   ....orderBy('v')
+   ....rowsBetween(-1, 0))
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
+---++--+
| id|   v|mean_v|
+---++--+
-   |  1| 1.0|   1.5|
+   |  1| 1.0|   1.0|
|  1| 2.0|   1.5|
-   |  2| 3.0|   6.0|
-   |  2| 5.0|   6.0|
-   |  2|10.0|   6.0|
+   |  2| 3.0|   3.0|
+   |  2| 5.0|   4.0|
+   |  2|10.0|   7.5|
+---++--+
 
+   .. warning:: For performance reasons, the input series to window 
functions are not copied.
 
 Review comment:
   Let's keep the indentation same with other `.. note::` as well ..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716665#comment-16716665
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240534200
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala
 ##
 @@ -24,14 +24,14 @@ import org.apache.spark.sql.catalyst.expressions.Projection
 /**
  * Function for comparing boundary values.
  */
-private[window] abstract class BoundOrdering {
+private[sql] abstract class BoundOrdering {
 
 Review comment:
   BTW, why is this `[sql]`? Looks it can be `[window]`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716668#comment-16716668
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240534340
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExecBase.scala
 ##
 @@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.window
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.types.{CalendarIntervalType, DateType, 
IntegerType, TimestampType}
+
+private[sql] abstract class WindowExecBase(
+windowExpression: Seq[NamedExpression],
+partitionSpec: Seq[Expression],
+orderSpec: Seq[SortOrder],
+child: SparkPlan) extends UnaryExecNode {
+
+  /**
+   * Create the resulting projection.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param expressions unbound ordered function expressions.
+   * @return the final resulting projection.
+   */
+  protected def createResultProjection(expressions: Seq[Expression]): 
UnsafeProjection = {
+val references = expressions.zipWithIndex.map { case (e, i) =>
+  // Results of window expressions will be on the right side of child's 
output
+  BoundReference(child.output.size + i, e.dataType, e.nullable)
+}
+val unboundToRefMap = expressions.zip(references).toMap
+val patchedWindowExpression = 
windowExpression.map(_.transform(unboundToRefMap))
+UnsafeProjection.create(
+  child.output ++ patchedWindowExpression,
+  child.output)
+  }
+
+  /**
+   * Create a bound ordering object for a given frame type and offset. A bound 
ordering object is
+   * used to determine which input row lies within the frame boundaries of an 
output row.
+   *
+   * This method uses Code Generation. It can only be used on the executor 
side.
+   *
+   * @param frame to evaluate. This can either be a Row or Range frame.
+   * @param bound with respect to the row.
+   * @param timeZone the session local timezone for time related calculations.
+   * @return a bound ordering object.
+   */
+  protected def createBoundOrdering(
+  frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
+(frame, bound) match {
+  case (RowFrame, CurrentRow) =>
+RowBoundOrdering(0)
+
+  case (RowFrame, IntegerLiteral(offset)) =>
+RowBoundOrdering(offset)
+
+  case (RangeFrame, CurrentRow) =>
+val ordering = newOrdering(orderSpec, child.output)
+RangeBoundOrdering(ordering, IdentityProjection, IdentityProjection)
+
+  case (RangeFrame, offset: Expression) if orderSpec.size == 1 =>
+// Use only the first order expression when the offset is non-null.
+val sortExpr = orderSpec.head
+val expr = sortExpr.child
+
+// Create the projection which returns the current 'value'.
+val current = newMutableProjection(expr :: Nil, child.output)
+
+// Flip the sign of the offset when processing the order is descending
+val boundOffset = sortExpr.direction match {
+  case Descending => UnaryMinus(offset)
+  case Ascending => offset
+}
+
+// Create the projection which returns the current 'value' modified by 
adding the offset.
+val boundExpr = (expr.dataType, boundOffset.dataType) match {
+  case (DateType, IntegerType) => DateAdd(expr, boundOffset)
+  case (TimestampType, CalendarIntervalType) =>
+TimeAdd(expr, boundOffset, Some(timeZone))
+  case (a, b) if a== b => Add(expr, boundOffset)
 
 Review 

[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716649#comment-16716649
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240529454
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/window/AggregateProcessor.scala
 ##
 @@ -113,7 +113,7 @@ private[window] object AggregateProcessor {
  * This class manages the processing of a number of aggregate functions. See 
the documentation of
  * the object for more information.
  */
-private[window] final class AggregateProcessor(
+private[sql] final class AggregateProcessor(
 
 Review comment:
   nit: `private[sql]` can be remove because `execution` is already a private 
package. For other places as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716606#comment-16716606
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240523454
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -60,6 +105,26 @@ case class WindowInPandasExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
+  /**
+   * Helper functions and data structures for window bounds
+   *
+   * It contains:
+   * (1) Total number of window bound indices in the python input row
+   * (2) Function from frame index to its lower bound column index in the 
python input row
+   * (3) Function from frame index to its upper bound column index in the 
python input row
+   * (4) Seq from frame index to its window bound type
+   */
+  private type WindowBoundHelpers = (Int, Int => Int, Int => Int, 
Seq[WindowType])
+
+  /**
+   * Enum for window bound types. Used only inside this class.
+   */
+  private sealed case class WindowType(value: String)
+  private object UnboundedWindow extends WindowType("unbounded")
+  private object BoundedWindow extends WindowType("bounded")
+
+  private val window_bound_type_conf = "pandas_window_bound_types"
 
 Review comment:
   nit: `window_bound_type_conf` -> `windowBoundTypeConf`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716598#comment-16716598
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240522289
 
 

 ##
 File path: python/pyspark/sql/functions.py
 ##
 @@ -2993,20 +2992,25 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
... def mean_udf(v):
... return v.mean()
-   >>> w = Window \\
-   ... .partitionBy('id') \\
-   ... .rowsBetween(Window.unboundedPreceding, 
Window.unboundedFollowing)
+   >>> w = (Window.partitionBy('id')
+   ....orderBy('v')
+   ....rowsBetween(-1, 0))
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
+---++--+
| id|   v|mean_v|
+---++--+
-   |  1| 1.0|   1.5|
+   |  1| 1.0|   1.0|
|  1| 2.0|   1.5|
-   |  2| 3.0|   6.0|
-   |  2| 5.0|   6.0|
-   |  2|10.0|   6.0|
+   |  2| 3.0|   3.0|
+   |  2| 5.0|   4.0|
+   |  2|10.0|   7.5|
+---++--+
 
+   .. warning:: For performance reasons, the input series to window 
functions are not copied.
 
 Review comment:
   @icexelloss, it's a nit but let's leave this as a note (just for consistency)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716578#comment-16716578
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118973
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5964/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716577#comment-16716577
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins removed a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118969
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716574#comment-16716574
 ] 

ASF GitHub Bot commented on SPARK-24561:


SparkQA commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118921
 
 
   **[Test build #99962 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/99962/testReport)**
 for PR 22305 at commit 
[`8505418`](https://github.com/apache/spark/commit/850541873df1101485a85632b85ba474dac67d56).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716576#comment-16716576
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118973
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/5964/
   Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716575#comment-16716575
 ] 

ASF GitHub Bot commented on SPARK-24561:


AmplabJenkins commented on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446118969
 
 
   Merged build finished. Test PASSed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716570#comment-16716570
 ] 

ASF GitHub Bot commented on SPARK-24561:


HyukjinKwon commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446117830
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716107#comment-16716107
 ] 

ASF GitHub Bot commented on SPARK-24561:


ueshin edited a comment on issue #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446062061
 
 
   LGTM.
   @felixcheung Could you help us review please? Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716096#comment-16716096
 ] 

ASF GitHub Bot commented on SPARK-24561:


ueshin commented on a change in pull request #22305: [SPARK-24561][SQL][Python] 
User-defined window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240464740
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Oh, I see. yeah, it should work. I missed that they are `UnsafeRow`s. Thanks!
   @icexelloss Could you close #23279 and leave it as is? I am sorry for taking 
your time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16716098#comment-16716098
 ] 

ASF GitHub Bot commented on SPARK-24561:


ueshin commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-446062061
 
 
   LGTM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715848#comment-16715848
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240434370
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   @hvanhovell Thanks for chiming in. I am not very familiar with unsafe row 
comparison and was just following @ueshin suggestion. If this is not needed. I 
can close #23279 and leave it as is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715701#comment-16715701
 ] 

ASF GitHub Bot commented on SPARK-24561:


hvanhovell commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240409746
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Why is this needed? You are comparing unsafe rows, so equality should work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715337#comment-16715337
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on a change in pull request #22305: 
[SPARK-24561][SQL][Python] User-defined window aggregation functions with 
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240331939
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ##
 @@ -144,24 +282,107 @@ case class WindowInPandasExec(
 queue.close()
   }
 
-  val inputProj = UnsafeProjection.create(allInputs, child.output)
-  val pythonInput = grouped.map { case (_, rows) =>
-rows.map { row =>
-  queue.add(row.asInstanceOf[UnsafeRow])
-  inputProj(row)
+  val stream = iter.map { row =>
+queue.add(row.asInstanceOf[UnsafeRow])
+row
+  }
+
+  val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+// Manage the stream and the grouping.
+var nextRow: UnsafeRow = null
+var nextGroup: UnsafeRow = null
+var nextRowAvailable: Boolean = false
+private[this] def fetchNextRow() {
+  nextRowAvailable = stream.hasNext
+  if (nextRowAvailable) {
+nextRow = stream.next().asInstanceOf[UnsafeRow]
+nextGroup = grouping(nextRow)
+  } else {
+nextRow = null
+nextGroup = null
+  }
+}
+fetchNextRow()
+
+// Manage the current partition.
+val buffer: ExternalAppendOnlyUnsafeRowArray =
+  new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, 
spillThreshold)
+var bufferIterator: Iterator[UnsafeRow] = _
+
+val indexRow = new 
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+val frames = factories.map(_(indexRow))
+
+private[this] def fetchNextPartition() {
+  // Collect all the rows in the current partition.
+  // Before we start to fetch new input rows, make a copy of nextGroup.
+  val currentGroup = nextGroup.copy()
+
+  // clear last partition
+  buffer.clear()
+
+  while (nextRowAvailable && nextGroup == currentGroup) {
 
 Review comment:
   Sure. Here https://github.com/apache/spark/pull/23279


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-12-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16715340#comment-16715340
 ] 

ASF GitHub Bot commented on SPARK-24561:


icexelloss commented on issue #22305: [SPARK-24561][SQL][Python] User-defined 
window aggregation functions with Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#issuecomment-445925430
 
 
   @ueshin Do you have any more comments other than 
https://github.com/apache/spark/pull/22305#discussion_r239312302?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-08-31 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16598885#comment-16598885
 ] 

Apache Spark commented on SPARK-24561:
--

User 'icexelloss' has created a pull request for this issue:
https://github.com/apache/spark/pull/22305

> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24561) User-defined window functions with pandas udf (bounded window)

2018-08-14 Thread Li Jin (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16579965#comment-16579965
 ] 

Li Jin commented on SPARK-24561:


I am looking into this. Early investigation: 
https://docs.google.com/document/d/14EjeY5z4-NC27-SmIP9CsMPCANeTcvxN44a7SIJtZPc/edit

> User-defined window functions with pandas udf (bounded window)
> --
>
> Key: SPARK-24561
> URL: https://issues.apache.org/jira/browse/SPARK-24561
> Project: Spark
>  Issue Type: Sub-task
>  Components: PySpark
>Affects Versions: 2.3.1
>Reporter: Li Jin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org