viirya commented on a change in pull request #24997: [SPARK-28198][PYTHON] Add
mapPartitionsInPandas to allow an iterator of DataFrames
URL: https://github.com/apache/spark/pull/24997#discussion_r298950721
##########
File path: python/pyspark/sql/dataframe.py
##########
@@ -2192,6 +2193,51 @@ def toPandas(self):
_check_series_convert_timestamps_local_tz(pdf[field.name], timezone)
return pdf
+ def mapPartitionsInPandas(self, udf):
+ """
+ Maps each partition of the current :class:`DataFrame` using a pandas
udf and returns
+ the result as a `DataFrame`.
+
+ The user-defined function should take an iterator of
`pandas.DataFrame`s and return another
+ iterator of `pandas.DataFrame`s. For each partition, all columns are
passed together as an
+ iterator of `pandas.DataFrame`s to the user-function and the returned
iterator of
+ `pandas.DataFrame`s are combined as a :class:`DataFrame`.
+ Each `pandas.DataFrame` size can be controlled by
+ `spark.sql.execution.arrow.maxRecordsPerBatch`.
+ Its schema must match the returnType of the pandas udf.
+
+ :param udf: A function object returned by
:meth:`pyspark.sql.functions.pandas_udf`
+
+ >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
+ >>> df = spark.createDataFrame([(1, 21), (2, 30)],
+ ... ("id", "age")) # doctest: +SKIP
+ >>> @pandas_udf(df.schema, PandasUDFType.SCALAR_ITER) # doctest: +SKIP
+ ... def filter_func(iterator):
+ ... for pdf in iterator:
+ ... yield pdf[pdf.id == 1]
+ >>> df.mapPartitionsInPandas(filter_func).show() # doctest: +SKIP
+ +---+---+
+ | id|age|
+ +---+---+
+ | 1| 21|
+ +---+---+
+
+ .. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+ """
+ # Columns are special because hasattr always return True
+ if isinstance(udf, Column) or not hasattr(udf, 'func') \
+ or udf.evalType != PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF:
+ raise ValueError("Invalid udf: the udf argument must be a
pandas_udf of type "
+ "SQL_SCALAR_PANDAS_ITER_UDF.")
Review comment:
Shall we say `PandasUDFType.SCALAR_ITER` instead of
`SQL_SCALAR_PANDAS_ITER_UDF`? Users should use `PandasUDFType`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]