cloud-fan commented on a change in pull request #27466: 
[SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python 
type hints
URL: https://github.com/apache/spark/pull/27466#discussion_r376341161
 
 

 ##########
 File path: docs/sql-pyspark-pandas-with-arrow.md
 ##########
 @@ -65,132 +65,215 @@ Spark will fall back to create the DataFrame without 
Arrow.
 
 ## Pandas UDFs (a.k.a. Vectorized UDFs)
 
-Pandas UDFs are user defined functions that are executed by Spark using Arrow 
to transfer data and
-Pandas to work with the data. A Pandas UDF is defined using the keyword 
`pandas_udf` as a decorator
-or to wrap the function, no additional configuration is required. Currently, 
there are two types of
-Pandas UDF: Scalar and Grouped Map.
+Pandas UDFs are user defined functions that are executed by Spark using
+Arrow to transfer data and Pandas to work with the data, which allows 
vectorized operations. A Pandas
+UDF is defined using the `pandas_udf` as a decorator or to wrap the function, 
and no additional
+configuration is required. A Pandas UDF behaves as a regular PySpark function 
API in general.
 
-### Scalar
+Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From 
Spark 3.0
+with Python 3.6+, you can also use [Python type 
hints](https://www.python.org/dev/peps/pep-0484).
+Using Python type hints are preferred and using `PandasUDFType` will be 
deprecated in
+the future release.
 
-Scalar Pandas UDFs are used for vectorizing scalar operations. They can be 
used with functions such
-as `select` and `withColumn`. The Python function should take `pandas.Series` 
as inputs and return
-a `pandas.Series` of the same length. Internally, Spark will execute a Pandas 
UDF by splitting
-columns into batches and calling the function for each batch as a subset of 
the data, then
-concatenating the results together.
+Note that the type hint should use `pandas.Series` in all cases but there is 
one variant
+that `pandas.DataFrame` should be used for its input or output type hint 
instead when the input
+or output column is of `StructType`. The following example shows a Pandas UDF 
which takes long
+column, string column and struct column, and outputs a struct column. It 
requires the function to
+specify the type hints of `pandas.Series` and `pandas.DataFrame` as below:
 
-The following example shows how to create a scalar Pandas UDF that computes 
the product of 2 columns.
+<p>
+<div class="codetabs">
+<div data-lang="python" markdown="1">
+{% include_example ser_to_frame_pandas_udf python/sql/arrow.py %}
+</div>
+</div>
+</p>
+
+In the following sections, it describes the cominations of the supported type 
hints. For simplicity,
+`pandas.DataFrame` variant is omitted.
+
+### Series to Series
+
+The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`.
+
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes one or more `pandas.Series` and outputs one `pandas.Series`. 
The output of the function should
+always be of the same length as the input. Internally, PySpark will execute a 
Pandas UDF by splitting
+columns into batches and calling the function for each batch as a subset of 
the data, then concatenating
+the results together.
+
+The following example shows how to create this Pandas UDF that computes the 
product of 2 columns.
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Scalar Iterator
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Iterator of Series to Iterator of Series
+
+The type hint can be expressed as `Iterator[pandas.Series]` -> 
`Iterator[pandas.Series]`.
+
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the given
+function takes an iterator of `pandas.Series` and outputs an iterator of 
`pandas.Series`. The output of each
+series from the function should always be of the same length as the input. In 
this case, the created
+Pandas UDF requires one input column when the Pandas UDF is called.
+
+It is useful when the UDF execution requires initializing some states although 
internally it works
+identically as Series to Series case. The pseudocode below illustrates the 
example.
+
+{% highlight python %}
+@pandas_udf("long")
+def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
+    # Do some expensive initialization with a state
+    state = very_expensive_initialization()
+    for x in iterator:
+        # Use that state for whole iterator.
+        yield calculate_with_state(x, state)
 
-Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF 
above except that the
-underlying Python function takes an iterator of batches as input instead of a 
single batch and,
-instead of returning a single output batch, it yields output batches or 
returns an iterator of
-output batches.
-It is useful when the UDF execution requires initializing some states, e.g., 
loading an machine
-learning model file to apply inference to every input batch.
+df.select(calculate("value")).show()
+{% endhighlight %}
 
-The following example shows how to create scalar iterator Pandas UDFs:
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example scalar_iter_pandas_udf python/sql/arrow.py %}
+{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-### Grouped Map
-Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the 
"split-apply-combine" pattern.
-Split-apply-combine consists of three steps:
-* Split the data into groups by using `DataFrame.groupBy`.
-* Apply a function on each group. The input and output of the function are 
both `pandas.DataFrame`. The
-  input data contains all the rows and columns for each group.
-* Combine the results into a new `DataFrame`.
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
-To use `groupBy().apply()`, the user needs to define the following:
-* A Python function that defines the computation for each group.
-* A `StructType` object or a string that defines the schema of the output 
`DataFrame`.
+### Iterator of Multiple Series to Iterator of Series
 
-The column labels of the returned `pandas.DataFrame` must either match the 
field names in the
-defined output schema if specified as strings, or match the field data types 
by position if not
-strings, e.g. integer indices. See 
[pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
-on how to label columns when constructing a `pandas.DataFrame`.
+The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> 
`Iterator[pandas.Series]`.
 
-Note that all data for a group will be loaded into memory before the function 
is applied. This can
-lead to out of memory exceptions, especially if the group sizes are skewed. 
The configuration for
-[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and 
it is up to the user
-to ensure that the grouped data will fit into the available memory.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF where the
+given function takes an iterator of a tuple of multiple `pandas.Series` and 
outputs an iterator of `pandas.Series`.
+In this case, the created pandas UDF requires multiple input columns as many 
as the series in the tuple
+when the Pandas UDF is called. It works identically as Iterator of Series to 
Iterator of Series case except the parameter difference.
 
-The following example shows how to use `groupby().apply()` to subtract the 
mean from each value in the group.
+The following example shows how to create this Pandas UDF:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
+{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
-For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 and
-[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply).
+For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
+
+### Series to Scalar
 
-### Grouped Aggregate
+The type hint can be expressed as `pandas.Series`, ... -> `Any`.
 
-Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. 
Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and
-[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It 
defines an aggregation from one or more `pandas.Series`
-to a scalar value, where each `pandas.Series` represents a column within the 
group or window.
+By using `pandas_udf` with the function having such type hints, it creates a 
Pandas UDF similar
+to PySpark's aggregate functions. The given function takes `pandas.Series` and 
returns a scalar value.
+The return type should be a primitive data type, and the returned scalar can 
be either a python
+primitive type, e.g., `int` or `float` or a numpy data type, e.g., 
`numpy.int64` or `numpy.float64`.
+`Any` should ideally be a specific scalar type accordingly.
 
-Note that this type of UDF does not support partial aggregation and all data 
for a group or window will be loaded into memory. Also,
-only unbounded window is supported with Grouped aggregate Pandas UDFs 
currently.
+This UDF can be also used with `groupBy().agg()` and 
[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window).
+It defines an aggregation from one or more `pandas.Series` to a scalar value, 
where each `pandas.Series`
+represents a column within the group or window.
 
-The following example shows how to use this type of UDF to compute mean with 
groupBy and window operations:
+Note that this type of UDF does not support partial aggregation and all data 
for a group or window
+will be loaded into memory. Also, only unbounded window is supported with 
Grouped aggregate Pandas
+UDFs currently. The following example shows how to use this type of UDF to 
compute mean with a group-by
+and window operations:
 
 <div class="codetabs">
 <div data-lang="python" markdown="1">
-{% include_example grouped_agg_pandas_udf python/sql/arrow.py %}
+{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %}
 </div>
 </div>
 
 For detailed usage, please see 
[`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf)
 
 
-### Map Iterator
+## Pandas Function APIs
+
+Pandas function APIs can directly apply a Python native function against the 
whole the DataFrame by
 
 Review comment:
   `the whole DataFrame`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to