viirya commented on a change in pull request #27466: 
[SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python 
type hints
URL: https://github.com/apache/spark/pull/27466#discussion_r375662886
 
 

 ##########
 File path: docs/sql-pyspark-pandas-with-arrow.md
 ##########
 @@ -65,132 +65,204 @@ Spark will fall back to create the DataFrame without 
Arrow.
 
 ## Pandas UDFs (a.k.a. Vectorized UDFs)
 
-Pandas UDFs are user defined functions that are executed by Spark using Arrow 
to transfer data and
-Pandas to work with the data. A Pandas UDF is defined using the keyword 
`pandas_udf` as a decorator
-or to wrap the function, no additional configuration is required. Currently, 
there are two types of
-Pandas UDF: Scalar and Grouped Map.
+Pandas UDFs are user defined functions that are executed by Spark using
+Arrow to transfer data and Pandas to work with the data, which allows 
vectorized operations. A Pandas
+UDF is defined using the `pandas_udf` as a decorator or to wrap the function, 
and no additional
+configuration is required. A Pandas UDF behaves as a regular PySpark function 
API in general.
 
-### Scalar
+Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From 
Spark 3.0
+with Python 3.6+, you can also use [Python type 
hints](https://www.python.org/dev/peps/pep-0484).
+Using Python type hints are preferred and using `PandasUDFType` will be 
deprecated in
+the future release.
 
-Scalar Pandas UDFs are used for vectorizing scalar operations. They can be 
used with functions such
-as `select` and `withColumn`. The Python function should take `pandas.Series` 
as inputs and return
-a `pandas.Series` of the same length. Internally, Spark will execute a Pandas 
UDF by splitting
-columns into batches and calling the function for each batch as a subset of 
the data, then
-concatenating the results together.
 
-The following example shows how to create a scalar Pandas UDF that computes 
the product of 2 columns.
+The below combinations of the type hints are supported for Pandas UDFs. Note 
that the type hint should
+be `pandas.Series` in all cases but there is one variant case that 
`pandas.DataFrame` should be mapped
+as its input or output type hint instead when the input or output column is of 
`StructType`.
+
+### Series to Series
+
+The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.
+
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes one or more `pandas.Series` and outputs one `pandas.Series`. 
The output of the function should
+always be of the same length as the input. Internally, PySpark will execute a 
Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of 
the data, then concatenating
+the results together.
+
+The following example shows how to create this Pandas UDF that computes the 
product of 2 columns.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Scalar Iterator
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Iterator of Series to Iterator of Series
+
+The type hint can be expressed as `Iterator[pandas.Series]` -> 
`Iterator[pandas.Series]`.
 
-Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF 
above except that the
-underlying Python function takes an iterator of batches as input instead of a 
single batch and,
-instead of returning a single output batch, it yields output batches or 
returns an iterator of
-output batches.
-It is useful when the UDF execution requires initializing some states, e.g., 
loading an machine
-learning model file to apply inference to every input batch.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes an iterator of `pandas.Series` and outputs an iterator of 
`pandas.Series`. The output of each
+series from the function should always be of the same length as the input. In 
this case, the created
+Pandas UDF requires one input column when the Pandas UDF is called.
 
-The following example shows how to create scalar iterator Pandas UDFs:
+It is useful when the UDF execution requires initializing some states although 
internally it works
+identically as Series to Series case. The pseudocode below illustrates the 
example.
+
+{% highlight python %}
+@pandas_udf("long")
+def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+    # Do some expensive initialization with a state
+    state = very_expensive_initialization()
+    for x in iterator:
+        # Use that state for whole iterator.
+        yield calculate_with_state(x, state)
+
+df.select(calculate("value")).show()
+{% endhighlight %}
+
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
+{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Grouped Map
-Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the 
"split-apply-combine" pattern.
-Split-apply-combine consists of three steps:
-* Split the data into groups by using `DataFrame.groupBy`.
-* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
-  input data contains all the rows and columns for each group.
-* Combine the results into a new `DataFrame`.
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
-To use `groupBy().apply()`, the user needs to define the following:
-* A Python function that defines the computation for each group.
-* A `StructType` object or a string that defines the schema of the output 
`DataFrame`.
+### Iterator of Multiple Series to Iterator of Series
 
-The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
-defined output schema if specified as strings, or match the field data types 
by position if not
-strings, e.g. integer indices. See 
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
-on how to label columns when constructing a `pandas.DataFrame`.
+The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> 
`Iterator[pandas.Series]`.
 
-Note that all data for a group will be loaded into memory before the function 
is applied. This can
-lead to out of memory exceptions, especially if the group sizes are skewed. 
The configuration for
-[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
-to ensure that the grouped data will fit into the available memory.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the
+given function takes an iterator of a tuple of multiple `pandas.Series` and 
outputs an iterator of `pandas.Series`.
+In this case, the created pandas UDF requires multiple input columns as many 
as the series in the tuple
+when the Pandas UDF is called. It works identically as Iterator of Series to 
Iterator of Series case except the parameter difference.
 
-The following example shows how to use `groupby().apply()` to subtract the 
mean from each value in the group.
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Series to Scalar
+
+The type hint can be expressed as `pandas.Series`, ... -> `Any`.
 
-### Grouped Aggregate
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF similar
+to PySpark's aggregate functions. The given function takes `pandas.Series` and 
returns a scalar value.
+The return type should be a primitive data type, and the returned scalar can 
be either a python
+primitive type, e.g., `int` or `float` or a numpy data type, e.g., 
`numpy.int64` or `numpy.float64`.
+`Any` should ideally be a specific scalar type accordingly.
 
-Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. 
Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
-[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It 
defines an aggregation from one or more `pandas.Series`
-to a scalar value, where each `pandas.Series` represents a column within the 
group or window.
+This UDF can be also used with `groupBy().agg()` and 
[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window).
+It defines an aggregation from one or more `pandas.Series` to a scalar value, 
where each `pandas.Series`
+represents a column within the group or window.
 
-Note that this type of UDF does not support partial aggregation and all data 
for a group or window will be loaded into memory. Also,
-only unbounded window is supported with Grouped aggregate Pandas UDFs 
currently.
+Note that this type of UDF does not support partial aggregation and all data 
for a group or window
+will be loaded into memory. Also, only unbounded window is supported with 
Grouped aggregate Pandas
+UDFs currently.
 
-The following example shows how to use this type of UDF to compute mean with 
groupBy and window operations:
+The following example shows how to use this type of UDF to compute mean with a 
group-by and window operations:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
 For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
 
-### Map Iterator
+## Pandas Function APIs
 
-Map iterator Pandas UDFs are used to transform data with an iterator of 
batches. Map iterator
-Pandas UDFs can be used with 
-[`pyspark.sql.DataFrame.mapInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
-It defines a map function that transforms an iterator of `pandas.DataFrame` to 
another.
+Pandas function APIs can directly apply a Python native function against the 
whole the DataFrame by
+using Pandas instances. Internally it works similarly with Pandas UDFs by 
Spark using Arrow to transfer
+data and Pandas to work with the data, which allows vectorized operations. A 
Pandas function API behaves
+as a regular API under PySpark `DataFrame` in general.
 
-It can return the output of arbitrary length in contrast to the scalar Pandas 
UDF. It maps an iterator of `pandas.DataFrame`s,
-that represents the current `DataFrame`, using the map iterator UDF and 
returns the result as a `DataFrame`.
+From Spark 3.0, Grouped map pandas UDF is now categorized as a separate Pandas 
Function API,
+`DataFrame.groupby().applyInPandas()`. It is still possible to use it with 
`PandasUDFType`
+and `DataFrame.groupby().apply()` as it was; however, it is preferred to use
+`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be 
deprecated
+in the future.
 
-The following example shows how to create map iterator Pandas UDFs:
+### Grouped Map
+
+Grouped map operations with Pandas instances are supported by 
`DataFrame.groupby().applyInPandas()`
+which requires a Python function that takes a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+It maps each group to each `pandas.DataFrame` in the Python function.
+
+This API implements the "split-apply-combine" pattern which consists of three 
steps:
+* Split the data into groups by using `DataFrame.groupBy`.
+* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
+  input data contains all the rows and columns for each group.
+* Combine the results into a new PySpark `DataFrame`.
+
+To use `groupBy().applyInPandas()`, the user needs to define the following:
+* A Python function that defines the computation for each group.
+* A `StructType` object or a string that defines the schema of the output 
PySpark `DataFrame`.
+
+The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
+defined output schema if specified as strings, or match the field data types 
by position if not
+strings, e.g. integer indices. See 
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
+on how to label columns when constructing a `pandas.DataFrame`.
+
+Note that all data for a group will be loaded into memory before the function 
is applied. This can
+lead to out of memory exceptions, especially if the group sizes are skewed. 
The configuration for
+[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
+to ensure that the grouped data will fit into the available memory.
+
+The following example shows how to use `groupby().applyInPandas()` to subtract 
the mean from each value
+in the group.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example map_iter_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_apply_in_pandas python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
+For detailed usage, please see 
[`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas).
+
+### Map
 
+Map operations with Pandas instances are supported by 
`DataFrame.mapInPandas()` which maps an iterator
+of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that 
represents the current
+PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The 
functions takes and outputs
+an iterator of `pandas.DataFrame`. It can return the output of arbitrary 
length in contrast to some
+Pandas UDFs although internally it works similarly with Series to Series 
Pandas UDF.
+
+The following example shows how to use `mapInPandas()`:
+
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example map_in_pandas python/sql/arrow.py %}
+</div>
+</div>
 
-### Cogrouped Map
+For detailed usage, please see 
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).
 
-Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key 
and then a python function applied to
-each cogroup.  They are used with `groupBy().cogroup().apply()` which consists 
of the following steps:
+### Co-grouped Map
 
+Co-grouped map operations with Pandas instances are supported by 
`DataFrame.cogroup().applyInPandas()` which
 
 Review comment:
   Is this `groupBy().cogroup().applyInPandas()`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to