cloud-fan commented on a change in pull request #27466: [SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python type hints URL: https://github.com/apache/spark/pull/27466#discussion_r376341161
########## File path: docs/sql-pyspark-pandas-with-arrow.md ########## @@ -65,132 +65,215 @@ Spark will fall back to create the DataFrame without Arrow. ## Pandas UDFs (a.k.a. Vectorized UDFs) -Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and -Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator -or to wrap the function, no additional configuration is required. Currently, there are two types of -Pandas UDF: Scalar and Grouped Map. +Pandas UDFs are user defined functions that are executed by Spark using +Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas +UDF is defined using the `pandas_udf` as a decorator or to wrap the function, and no additional +configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. -### Scalar +Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From Spark 3.0 +with Python 3.6+, you can also use [Python type hints](https://www.python.org/dev/peps/pep-0484). +Using Python type hints are preferred and using `PandasUDFType` will be deprecated in +the future release. -Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such -as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return -a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting -columns into batches and calling the function for each batch as a subset of the data, then -concatenating the results together. +Note that the type hint should use `pandas.Series` in all cases but there is one variant +that `pandas.DataFrame` should be used for its input or output type hint instead when the input +or output column is of `StructType`. The following example shows a Pandas UDF which takes long +column, string column and struct column, and outputs a struct column. It requires the function to +specify the type hints of `pandas.Series` and `pandas.DataFrame` as below: -The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns. +<p> +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% include_example ser_to_frame_pandas_udf python/sql/arrow.py %} +</div> +</div> +</p> + +In the following sections, it describes the cominations of the supported type hints. For simplicity, +`pandas.DataFrame` variant is omitted. + +### Series to Series + +The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`. + +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given +function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should +always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting +columns into batches and calling the function for each batch as a subset of the data, then concatenating +the results together. + +The following example shows how to create this Pandas UDF that computes the product of 2 columns. <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example scalar_pandas_udf python/sql/arrow.py %} +{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %} </div> </div> -### Scalar Iterator +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) + +### Iterator of Series to Iterator of Series + +The type hint can be expressed as `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`. + +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given +function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The output of each +series from the function should always be of the same length as the input. In this case, the created +Pandas UDF requires one input column when the Pandas UDF is called. + +It is useful when the UDF execution requires initializing some states although internally it works +identically as Series to Series case. The pseudocode below illustrates the example. + +{% highlight python %} +@pandas_udf("long") +def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + # Do some expensive initialization with a state + state = very_expensive_initialization() + for x in iterator: + # Use that state for whole iterator. + yield calculate_with_state(x, state) -Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF above except that the -underlying Python function takes an iterator of batches as input instead of a single batch and, -instead of returning a single output batch, it yields output batches or returns an iterator of -output batches. -It is useful when the UDF execution requires initializing some states, e.g., loading an machine -learning model file to apply inference to every input batch. +df.select(calculate("value")).show() +{% endhighlight %} -The following example shows how to create scalar iterator Pandas UDFs: +The following example shows how to create this Pandas UDF: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example scalar_iter_pandas_udf python/sql/arrow.py %} +{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %} </div> </div> -### Grouped Map -Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern. -Split-apply-combine consists of three steps: -* Split the data into groups by using `DataFrame.groupBy`. -* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The - input data contains all the rows and columns for each group. -* Combine the results into a new `DataFrame`. +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) -To use `groupBy().apply()`, the user needs to define the following: -* A Python function that defines the computation for each group. -* A `StructType` object or a string that defines the schema of the output `DataFrame`. +### Iterator of Multiple Series to Iterator of Series -The column labels of the returned `pandas.DataFrame` must either match the field names in the -defined output schema if specified as strings, or match the field data types by position if not -strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) -on how to label columns when constructing a `pandas.DataFrame`. +The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`. -Note that all data for a group will be loaded into memory before the function is applied. This can -lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for -[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user -to ensure that the grouped data will fit into the available memory. +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the +given function takes an iterator of a tuple of multiple `pandas.Series` and outputs an iterator of `pandas.Series`. +In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple +when the Pandas UDF is called. It works identically as Iterator of Series to Iterator of Series case except the parameter difference. -The following example shows how to use `groupby().apply()` to subtract the mean from each value in the group. +The following example shows how to create this Pandas UDF: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example grouped_map_pandas_udf python/sql/arrow.py %} +{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %} </div> </div> -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and -[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply). +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) + +### Series to Scalar -### Grouped Aggregate +The type hint can be expressed as `pandas.Series`, ... -> `Any`. -Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and -[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It defines an aggregation from one or more `pandas.Series` -to a scalar value, where each `pandas.Series` represents a column within the group or window. +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF similar +to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value. +The return type should be a primitive data type, and the returned scalar can be either a python +primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. +`Any` should ideally be a specific scalar type accordingly. -Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, -only unbounded window is supported with Grouped aggregate Pandas UDFs currently. +This UDF can be also used with `groupBy().agg()` and [`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). +It defines an aggregation from one or more `pandas.Series` to a scalar value, where each `pandas.Series` +represents a column within the group or window. -The following example shows how to use this type of UDF to compute mean with groupBy and window operations: +Note that this type of UDF does not support partial aggregation and all data for a group or window +will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas +UDFs currently. The following example shows how to use this type of UDF to compute mean with a group-by +and window operations: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example grouped_agg_pandas_udf python/sql/arrow.py %} +{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %} </div> </div> For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) -### Map Iterator +## Pandas Function APIs + +Pandas function APIs can directly apply a Python native function against the whole the DataFrame by Review comment: `the whole DataFrame` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
