Github user BryanCutler commented on a diff in the pull request:
https://github.com/apache/spark/pull/18732#discussion_r143313284
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
jgd = self._jgd.pivot(pivot_col)
else:
jgd = self._jgd.pivot(pivot_col, values)
- return GroupedData(jgd, self.sql_ctx)
+ return GroupedData(jgd, self._df)
+
+ @since(2.3)
+ def apply(self, udf):
+ """
+ Maps each group of the current :class:`DataFrame` using a pandas
udf and returns the result
+ as a :class:`DataFrame`.
+
+ The user-defined function should take a `pandas.DataFrame` and
return another
+ `pandas.DataFrame`. For each group, all columns are passed
together as a `pandas.DataFrame`
+ to the user-function and the returned `pandas.DataFrame` are
combined as a
+ :class:`DataFrame`. The returned `pandas.DataFrame` can be
arbitrary length and its schema
+ must match the returnType of the pandas udf.
+
+ :param udf: A wrapped udf function returned by
:meth:`pyspark.sql.functions.pandas_udf`
+
+ >>> from pyspark.sql.functions import pandas_udf
+ >>> df = spark.createDataFrame(
+ ... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+ ... ("id", "v"))
+ >>> @pandas_udf(returnType=df.schema)
+ ... def normalize(pdf):
+ ... v = pdf.v
+ ... return pdf.assign(v=(v - v.mean()) / v.std())
+ >>> df.groupby('id').apply(normalize).show() # doctest: +SKIP
+ +---+-------------------+
+ | id| v|
+ +---+-------------------+
+ | 1|-0.7071067811865475|
+ | 1| 0.7071067811865475|
+ | 2|-0.8320502943378437|
+ | 2|-0.2773500981126146|
+ | 2| 1.1094003924504583|
+ +---+-------------------+
+
+ .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+ """
+ from pyspark.sql.functions import pandas_udf
+
+ # Columns are special because hasattr always return True
+ if isinstance(udf, Column) or not hasattr(udf, 'func') or not
udf.vectorized:
+ raise ValueError("The argument to apply must be a pandas_udf")
+ if not isinstance(udf.returnType, StructType):
+ raise ValueError("The returnType of the pandas_udf must be a
StructType")
+
+ df = self._df
+ func = udf.func
+ returnType = udf.returnType
--- End diff --
is it necessary to make all these copies? I could understand maybe copying
`func` and `columns` because they are in the wrapped function, but not sure if
`df` and `returnType` need to be copied
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]