holdenk commented on a change in pull request #34505:
URL: https://github.com/apache/spark/pull/34505#discussion_r747867698
##########
File path: python/pyspark/sql/pandas/map_ops.py
##########
@@ -91,6 +91,68 @@ def mapInPandas(
jdf = self._jdf.mapInPandas(udf_column._jc.expr()) # type:
ignore[operator]
return DataFrame(jdf, self.sql_ctx)
+ def mapInArrow(
+ self, func: "ArrowMapIterFunction", schema: Union[StructType, str]
+ ) -> "DataFrame":
+ """
+ Maps an iterator of batches in the current :class:`DataFrame` using a
Python native
+ function that takes and outputs a PyArrow's `RecordBatch`, and returns
the result as a
+ :class:`DataFrame`.
+
+ The function should take an iterator of `pyarrow.RecordBatch`\\s and
return
+ another iterator of `pyarrow.RecordBatch`\\s. All columns are passed
+ together as an iterator of `pyarrow.RecordBatch`\\s to the function
and the
+ returned iterator of `pyarrow.RecordBatch`\\s are combined as a
:class:`DataFrame`.
+ Each `pyarrow.RecordBatch` size can be controlled by
+ `spark.sql.execution.arrow.maxRecordsPerBatch`.
+
+ .. versionadded:: 3.3.0
+
+ Parameters
+ ----------
+ func : function
+ a Python native function that takes an iterator of
`pyarrow.RecordBatch`\\s, and
+ outputs an iterator of `pyarrow.RecordBatch`\\s.
+ schema : :class:`pyspark.sql.types.DataType` or str
+ the return type of the `func` in PySpark. The value can be either a
+ :class:`pyspark.sql.types.DataType` object or a DDL-formatted type
string.
+
+ Examples
+ --------
+ >>> import pyarrow # doctest: +SKIP
+ >>> df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
+ >>> def filter_func(iterator):
+ ... for batch in iterator:
+ ... pdf = batch.to_pandas()
Review comment:
nit: follow up doc PR with a non-pandas example.
##########
File path: python/pyspark/sql/pandas/functions.pyi
##########
@@ -149,6 +152,24 @@ def pandas_udf(
f: Union[StructType, str], *, functionType: PandasMapIterUDFType
) -> Callable[[PandasMapIterFunction], MapIterPandasUserDefinedFunction]: ...
@overload
+def pandas_udf(
+ f: ArrowMapIterFunction,
+ returnType: Union[StructType, str],
+ functionType: ArrowMapIterUDFType,
+) -> MapIterArrowUserDefinedFunction: ...
Review comment:
+1 `arrow_udf`? but if it's a pain I think its ok.
##########
File path: python/pyspark/sql/udf.py
##########
@@ -405,12 +409,10 @@ def register(
PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
- PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
]:
raise ValueError(
"Invalid f: f must be SQL_BATCHED_UDF,
SQL_SCALAR_PANDAS_UDF, "
- "SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or
"
- "SQL_MAP_PANDAS_ITER_UDF."
Review comment:
This change seems unrelated and a regression, did PANDAS_ITER_UDF not
work here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]