[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18732


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143819790
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
+returnType of the pandas udf.
+
+This function does not support partial aggregation, and requires 
shuffling all the data in
+the `DataFrame`.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
--- End diff --

Yeah, that's ok.  I just want to make sure it's absolutely clear what the 
input is because the user doesn't know or even care what type of 
object`pandas_udf` returns.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143813642
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Physical node for 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
+ *
+ * Rows in each group are passed to the python worker as a Arrow record 
batch.
--- End diff --

Fixed "a Arrow -> an Arrow"

Fixed "Python and Java capitalization"

I am actually leaning toward keeping `pandas.DataFrame` . The preference to 
`pandas` is usually lower case:
https://pandas.pydata.org/pandas-docs/stable/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143812619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
--- End diff --

Yes. The reason is explained in the docstring of BatchIterator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143812311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  /**
+   * Applies a vectorized python user-defined function to each group of 
data.
+   * The user-defined function defines a transformation: 
`Pandas.DataFrame` -> `Pandas.DataFrame`.
+   * For each group, all elements in the group are passed as a 
`Pandas.DataFrame` and the results
+   * for all groups are combined into a new `DataFrame`.
+   *
+   * This function does not support partial aggregation, and requires 
shuffling all the data in
+   * the `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810948
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
+returnType of the pandas udf.
+
+This function does not support partial aggregation, and requires 
shuffling all the data in
+the `DataFrame`.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
--- End diff --

I think
```
 A pandas_udf returned by :meth:`pyspark.sql.functions.pandas_udf`
```
is redundant, how about 
```
 A function object  returned by :meth:`pyspark.sql.functions.pandas_udf`
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810736
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810539
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810355
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
--- End diff --

I think "pandas udf"  as a word is fine. `pandas_udf` is the function name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143809711
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143803982
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Physical node for 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
+ *
+ * Rows in each group are passed to the python worker as a Arrow record 
batch.
--- End diff --

a Arrow -> an Arrow

minor nits: capitalize Python and Java, and change to `Pandas.DataFrame`  
in these paragraphs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143802697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
--- End diff --

Maybe put the reason not to use `grouped()`, I think it can make copies of 
the rows is that right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143802019
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  /**
+   * Applies a vectorized python user-defined function to each group of 
data.
+   * The user-defined function defines a transformation: 
`Pandas.DataFrame` -> `Pandas.DataFrame`.
+   * For each group, all elements in the group are passed as a 
`Pandas.DataFrame` and the results
+   * for all groups are combined into a new `DataFrame`.
+   *
+   * This function does not support partial aggregation, and requires 
shuffling all the data in
+   * the `DataFrame`.
--- End diff --

I believe for scaladoc the Spark DataFrame should be enclosed in brackets 
-> [[DataFrame]]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143800589
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
+returnType of the pandas udf.
+
+This function does not support partial aggregation, and requires 
shuffling all the data in
+the `DataFrame`.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
--- End diff --

I think "A wrapped udf" might be confusing to the user, how about just 
saying "A `pandas_udf` returned by..."?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143800072
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
--- End diff --

should be "can have an arbitrary length"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143799780
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
--- End diff --

I think this should be plural -> `pandas.DataFrame`s are combined

`DataFrame`. -> :class:`DataFrame`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143799187
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
--- End diff --

"pandas udf" to "pandas_udf"

`DataFrame` to :class:`DataFrame`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143744197
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a vectorized user defined function (UDF).
 
-:param f: python function if used as a standalone function
+:param f: user-defined function. A python function if used as a 
standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
->>> from pyspark.sql.types import IntegerType, StringType
->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
->>> @pandas_udf(returnType=StringType())
-... def to_upper(s):
-... return s.str.upper()
-...
->>> @pandas_udf(returnType="integer")
-... def add_one(x):
-... return x + 1
-...
->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-... .show()  # doctest: +SKIP
-+--+--++
-|slen(name)|to_upper(name)|add_one(age)|
-+--+--++
-| 8|  JOHN DOE|  22|
-+--+--++
+The user-defined function can define one of the following 
transformations:
+
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
+   :meth:`pyspark.sql.DataFrame.select`.
+   The returnType should be a primitive data type, e.g., 
`DoubleType()`.
+   The length of the returned `pandas.Series` must be of the same as 
the input `pandas.Series`.
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with :meth:`pyspark.sql.GroupedData.apply`.
--- End diff --

Change to `This udf is only used with` and added `note`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143741944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
+
 val columnarBatchIter = new ArrowPythonRunner(
-funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker,
+funcs, bufferSize, reuseWorker,
 PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema)
-  .compute(iter, context.partitionId(), context)
+  .compute(batchIter, context.partitionId(), context)
 
 new Iterator[InternalRow] {
 
-  var currentIter = if (columnarBatchIter.hasNext) {
+  private var currentIter = if (columnarBatchIter.hasNext) {
--- End diff --

I think so. The variable is reassigned for each columnar batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
--- End diff --

Removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,4 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
--- End diff --

Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740157
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
+StructType([StructField('id', LongType()), StructField('v', 
DoubleType())]))
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+expected = expected.assign(v=expected.v.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_complex_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('norm', DoubleType())]))
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(norm=(v - v.mean()) / v.std())
+
+result = df.groupby(col('id') % 2 == 
0).apply(normalize).sort('id', 'v').toPandas()
+pdf = df.toPandas()
+expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func)
+expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
+expected = expected.assign(norm=expected.norm.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_empty_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+

[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740129
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740078
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143646922
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
--- End diff --

tiny typo: a -> an.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143646526
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
+ * This is used by DataFrame.groupby().apply().
+ */
+case class FlatMapGroupsInPandas(
--- End diff --

Actually it is moved out `object.scala` by a previous comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143640334
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
+ * This is used by DataFrame.groupby().apply().
+ */
+case class FlatMapGroupsInPandas(
--- End diff --

shall we move this class to `object.scala`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143640650
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
+
 val columnarBatchIter = new ArrowPythonRunner(
-funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker,
+funcs, bufferSize, reuseWorker,
 PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema)
-  .compute(iter, context.partitionId(), context)
+  .compute(batchIter, context.partitionId(), context)
 
 new Iterator[InternalRow] {
 
-  var currentIter = if (columnarBatchIter.hasNext) {
+  private var currentIter = if (columnarBatchIter.hasNext) {
--- End diff --

does it need to be var?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143630635
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
+StructType([StructField('id', LongType()), StructField('v', 
DoubleType())]))
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+expected = expected.assign(v=expected.v.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_complex_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('norm', DoubleType())]))
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(norm=(v - v.mean()) / v.std())
+
+result = df.groupby(col('id') % 2 == 
0).apply(normalize).sort('id', 'v').toPandas()
+pdf = df.toPandas()
+expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func)
+expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
+expected = expected.assign(norm=expected.norm.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_empty_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+   

[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143630813
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,4 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
--- End diff --

little nit: let's remove other changes here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143630505
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
--- End diff --

ditto: `df` -> `pdf`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143629848
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a vectorized user defined function (UDF).
 
-:param f: python function if used as a standalone function
+:param f: user-defined function. A python function if used as a 
standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
->>> from pyspark.sql.types import IntegerType, StringType
->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
->>> @pandas_udf(returnType=StringType())
-... def to_upper(s):
-... return s.str.upper()
-...
->>> @pandas_udf(returnType="integer")
-... def add_one(x):
-... return x + 1
-...
->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-... .show()  # doctest: +SKIP
-+--+--++
-|slen(name)|to_upper(name)|add_one(age)|
-+--+--++
-| 8|  JOHN DOE|  22|
-+--+--++
+The user-defined function can define one of the following 
transformations:
+
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
+   :meth:`pyspark.sql.DataFrame.select`.
+   The returnType should be a primitive data type, e.g., 
`DoubleType()`.
+   The length of the returned `pandas.Series` must be of the same as 
the input `pandas.Series`.
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with :meth:`pyspark.sql.GroupedData.apply`.
--- End diff --

Maybe, `This udf is used with` -> `This udf is only used with` or .. 
probably we should add a `note` here. If I didn't know the context here, I'd 
wonder why it does not work as normal pandas udf ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143630469
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
--- End diff --

little nit: I'd call id `pdf` partly to avoid shadowing `df` and partly to 
say `pd.DataFrame`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143630939
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
--- End diff --

little nit: I'd remove this comment. I think the name already implies what 
this file contains.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143622623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  /**
+   * Applies a vectorized python use-defined function to each group of 
data.
--- End diff --

Thanks! Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143622617
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Physical node for 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
+ *
+ * Rows in each group are passed to the python worker as a Arrow record 
batch.
+ * The python worker turns the record batch to a pandas.DataFrame, invoke 
the
+ * use-defined function, and passes the resulting pandas.DataFrame
--- End diff --

Thanks! Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143614190
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  /**
+   * Applies a vectorized python use-defined function to each group of 
data.
--- End diff --

nit: `use-defined` -> `user-defined`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143614283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Physical node for 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
+ *
+ * Rows in each group are passed to the python worker as a Arrow record 
batch.
+ * The python worker turns the record batch to a pandas.DataFrame, invoke 
the
+ * use-defined function, and passes the resulting pandas.DataFrame
--- End diff --

nit: `use-defined` -> `user-defined`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143507748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

@HyukjinKwon Thanks for catching this. I added docs for 
`FlatMapGroupsInPandas` (function) `FlatMapGroupsInPandas` (logical node) and 
`FlatMapGroupsInPandasExec` and cross referenced them.

@rxin I created file `pythonLogicalOperators` under 
`org.apache.spark.sql.catalyst.plans.logical` and move `FlatMapGroupsInPandas` 
under that file.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143506845
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a
+:class:`DataFrame`. The returned `pandas.DataFrame` can be 
arbitrary length and its schema
+must match the returnType of the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
--- End diff --

I actually like it because I think it's more readable this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-07 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143340681
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

I'd also put this somewhere else that's not object.scala 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143338477
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Hm.. looks the doc for this `case class` is missed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143313284
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a
+:class:`DataFrame`. The returned `pandas.DataFrame` can be 
arbitrary length and its schema
+must match the returnType of the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
--- End diff --

is it necessary to make all these copies?  I could understand maybe copying 
`func` and `columns` because they are in the wrapped function, but not sure if 
`df` and `returnType` need to be copied


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143263694
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
--- End diff --

@rxin just to recap our discussion regarding naming:

You asked:
> What's the difference between this one and the transform function you 
also proposed? I'm trying to see if all the naming
makes sense when considered together.

Answer is:
`transform` takes a function: pd.Series -> pd.Series and apply the function 
on each column (or subset of columns). The input and output Series are of the 
same length.

`apply` takes a function: pd.DataFrame -> pd.DataFrame and apply the 
function on the group. Similar to `flatMapGroups`

Does this make sense to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143213000
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

Added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143198047
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

I see. Yeah I can make it more clear in the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143086739
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

ok, I'll try but if this PR is ready to merge then no need to hold up for 
this, we can address later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143083397
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

Feel free! The tricky bit is that the wrapped function is unknown when 
calling pandas_udf so there needs to be a way to change it later.

Also the function chaining makes it a bit complicated too.

I still think we should do it, but needs to be more careful.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143082816
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

I was thinking it should simplify things. Mind if I give it a try? If does 
only complicate then we can forget it..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143081782
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

@BryanCutler I got something kind of working here:

https://github.com/icexelloss/spark/pull/5

But I think the change is not trivial and I don't want to complicate  this 
PR. (It is already pretty large).

Should we maybe make the wrapper refactoring it's own PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143081592
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

Do you mean sending all columns?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143080889
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

This sends the entire Spark `DataFrame` to Python, is that what users would 
expect for this operation?  Should that be mentioned in the doc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143032320
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Ahh neat. Thanks @HyukjinKwon @BryanCutler 

doctest passes now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143033289
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

Added doc. Check for `returnType == StructType `is done earlier:


https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/group.py#L238


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143008730
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

I see, yes it does strip the names from the columns.  How about the 
opposite way then, move the conversion from DataFrame to Series out of 
`worker.py` `wrap_pandas_udf` and put it here?  I think you could then use 
`wrap_pandas_udf` as-is.

I also think doing this wrapping would be better off in `functions.py` 
`_create_udf`, is that possible?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143005943
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Ah, cool. I misunderstood. That's the answer to the question.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143004584
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

@icexelloss , this works for me to run doctests locally

`SPARK_TESTING=1 bin/pyspark pyspark.sql.group`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142961120
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

Yes will do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142957552
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, 
vectorized=False):
 self._name = name or (
 func.__name__ if hasattr(func, '__name__')
 else func.__class__.__name__)
-self._vectorized = vectorized
+self.vectorized = vectorized
--- End diff --

Are we ok with having `vectorized` being public field? I am fine with both 
public or private but I do think the fields of the function returned by 
`UserDefinedFuncion_wrapped()` should have the same field names as 
`UserDefinedFunction` to avoid confusion.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142956597
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

@BryanCutler yeah I was trying to do that earlier  that but unfortunately 
the column names are lost on the worker so we cannot construct the 
`Pandas.DataFrame` on the worker.

I think the best plcae to define the wrap function is probably on the 
pyspark driver side because we have the most information there. However, that 
requires some refactoring. I will give it a try and see how that goes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142952213
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Not sure.. I think what you know is what I usually do.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142949557
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
--- End diff --

Yes, I will change the name to some thing more descriptive. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142949179
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

Yea, let's add some comments and throws a better exception. For example, I 
think we should clarify `StructType` should be used in groupping udf only in 
the exception message.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142948551
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Ahh..Thanks! Will give it a try.

Still, is there a easier way to run the pyspark tests locally (the way 
jenkins runs them)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142948307
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
--- End diff --

Yea, but `fn` looks a no-no .. do you maybe have an idea about a better 
name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142947514
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Also, it looks this file does not define `spark` as a global that is used 
in doctests. I think we should add something like ...

```diff
  sc = spark.sparkContext
  globs['sc'] = sc
+ globs['spark'] = spark 
  globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142946504
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Probably, importing `pandas_udf` should solve the problem I guess.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142946430
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

I think the problem is, `pandas_udf` is unimportable in this doctest. Up to 
my knowledge, `# doctest: +SKIP` is per line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142945465
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

I have been using 
```
bin/pyspark pyspark.sql.tests GroupbyApplyTests
```

But this doesn't seem to do doctest.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-05 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142944123
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
--- End diff --

Seems this is still not skipped by doc test.

What's the best way to run pyspark test locally?

I tried 

```
./run-tests --modules=pyspark-sql --parallelism=4
```

But it's giving me a different failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142845456
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() #  doctest: +SKIP
--- End diff --

Fixed. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142841543
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() #  doctest: +SKIP
--- End diff --

nit: the spaces around `#` are wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142840490
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Doc added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142839010
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

Sorry I pushed a bit late. The comment is added now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142836611
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

I didn't see the comment added?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142836297
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
+groupingAttributes: Seq[Attribute],
+functionExpr: Expression,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  /**
+   * This is needed because output attributes is considered `reference` 
when
+   * passed through the constructor.
+   *
+   * Without this, catalyst will complain that output attributes are 
missing
+   * from the input.
+   */
+  override val producedAttributes = AttributeSet(output)
--- End diff --

Ok. I see. Should be fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142836245
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
+groupingAttributes: Seq[Attribute],
+functionExpr: Expression,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  /**
+   * This is needed because output attributes is considered `reference` 
when
--- End diff --

nit: `reference` -> `references`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142835260
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

`return GroupedData(jgd, self)` -> `return GroupedData(jgd, self._df)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142801623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142796899
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
+groupingAttributes: Seq[Attribute],
+functionExpr: Expression,
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  /**
+   * This is needed because output attributes is considered `reference` 
when
+   * passed through the constructor.
+   *
+   * Without this, catalyst will complain that output attributes are 
missing
+   * from the input.
+   */
+  override val producedAttributes = AttributeSet(output)
--- End diff --

This is one of the trick bit.

It's because of this code:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L135

Because of `productIterator` will return all member variables, including 
`output`, `references` of the tree node will include all output attributes, and 
it will complain about missing input:

```
def missingInput: AttributeSet = references -- inputSet -- 
producedAttributes
```

I think my solution here isn't great but I don't know the best way of deal 
with this. If someone with deeper catalyst knowledge can suggest, I am happy to 
give rid of this bit..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142770337
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val rowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  batch.rowIterator.asScala
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142740947
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
+
+Dataset.ofRows(
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142735696
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

That's fine, we can leave the serializer as is, but it seems like a waste 
to unwrap the pandas_udf only to then wrap it again as another pandas_udf.  
@icexelloss , how about changing the Series to a DataFrame in `worker.py` 
`wrap_pandas_udf` right before the user function is called?  

That way both transformations are in one place and `wrap_pandas_udf` would 
work like this:
Changes Series to DataFrame; Call user function with DataFrame; Get result 
DataFrame; Change DataFrame to Series.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142720877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val rowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  batch.rowIterator.asScala
--- End diff --

If we don't need to check the schema, we can simplify the iterator as:

```scala
val rowIter = columnarBatchIter.flatMap(_.rowIterator.asScala)
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142704642
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142704126
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142697418
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

normal pandas doesn't support `StructType` as returnType that's why this 
works.

However, I agree the way we distinguish grouping udf and normal udf is not 
clean. Ideally we should have a cleaner way of defining such wrapping functions 
for different pandas_udf use cases. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695929
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+grouping: Seq[Expression],
+func: Expression,
+override val output: Seq[Attribute],
+override val child: SparkPlan
+) extends UnaryExecNode {
+
+  val groupingAttributes: Seq[Attribute] = grouping.map {
+case ne: NamedExpression => ne.toAttribute
+  }
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
--- End diff --

Yes thanks much! I will take a look now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695843
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -111,6 +111,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with 
PredicateHelper {
   }
 
   def apply(plan: SparkPlan): SparkPlan = plan transformUp {
+// FlatMapGroupsInPandas and be evaluated in python worker
--- End diff --

Good catch. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

+1. Let me add that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Yes agreed. I will fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694835
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
+assert len(out.columns) == len(arrow_return_types), \
+'Number of columns of the returned pd.DataFrame doesn\'t 
match output schema. ' \
--- End diff --

Good catch. Fixed. (Btw thanks for catching these small things)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694484
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
--- End diff --

Good catch. Yeah let's keep such terms consistent. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694381
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142693686
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
--- End diff --

`verify_result_type` is kind of a misnomer because this function does:

1. convert the output of the user-defined function (pandas.DataFrame) to 
the form that the serialzer take (list of (pd.Series, DataType)) 

2. Validate the return value of the user-defined function.

Part of the verifying of the result type is done in the serializer in the 
process of coercing. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142693843
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Yes that is more consistent. Let me change that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142692448
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >