viirya commented on a change in pull request #27466: [SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python type hints URL: https://github.com/apache/spark/pull/27466#discussion_r375662886
########## File path: docs/sql-pyspark-pandas-with-arrow.md ########## @@ -65,132 +65,204 @@ Spark will fall back to create the DataFrame without Arrow. ## Pandas UDFs (a.k.a. Vectorized UDFs) -Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and -Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator -or to wrap the function, no additional configuration is required. Currently, there are two types of -Pandas UDF: Scalar and Grouped Map. +Pandas UDFs are user defined functions that are executed by Spark using +Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas +UDF is defined using the `pandas_udf` as a decorator or to wrap the function, and no additional +configuration is required. A Pandas UDF behaves as a regular PySpark function API in general. -### Scalar +Before Spark 3.0, Pandas UDFs used to be defined with `PandasUDFType`. From Spark 3.0 +with Python 3.6+, you can also use [Python type hints](https://www.python.org/dev/peps/pep-0484). +Using Python type hints are preferred and using `PandasUDFType` will be deprecated in +the future release. -Scalar Pandas UDFs are used for vectorizing scalar operations. They can be used with functions such -as `select` and `withColumn`. The Python function should take `pandas.Series` as inputs and return -a `pandas.Series` of the same length. Internally, Spark will execute a Pandas UDF by splitting -columns into batches and calling the function for each batch as a subset of the data, then -concatenating the results together. -The following example shows how to create a scalar Pandas UDF that computes the product of 2 columns. +The below combinations of the type hints are supported for Pandas UDFs. Note that the type hint should +be `pandas.Series` in all cases but there is one variant case that `pandas.DataFrame` should be mapped +as its input or output type hint instead when the input or output column is of `StructType`. + +### Series to Series + +The type hint can be expressed as `pandas.Series`, ... -> `pandas.Series`. + +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given +function takes one or more `pandas.Series` and outputs one `pandas.Series`. The output of the function should +always be of the same length as the input. Internally, PySpark will execute a Pandas UDF by splitting +columns into batches and calling the function for each batch as a subset of the data, then concatenating +the results together. + +The following example shows how to create this Pandas UDF that computes the product of 2 columns. <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example scalar_pandas_udf python/sql/arrow.py %} +{% include_example ser_to_ser_pandas_udf python/sql/arrow.py %} </div> </div> -### Scalar Iterator +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) + +### Iterator of Series to Iterator of Series + +The type hint can be expressed as `Iterator[pandas.Series]` -> `Iterator[pandas.Series]`. -Scalar iterator (`SCALAR_ITER`) Pandas UDF is the same as scalar Pandas UDF above except that the -underlying Python function takes an iterator of batches as input instead of a single batch and, -instead of returning a single output batch, it yields output batches or returns an iterator of -output batches. -It is useful when the UDF execution requires initializing some states, e.g., loading an machine -learning model file to apply inference to every input batch. +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the given +function takes an iterator of `pandas.Series` and outputs an iterator of `pandas.Series`. The output of each +series from the function should always be of the same length as the input. In this case, the created +Pandas UDF requires one input column when the Pandas UDF is called. -The following example shows how to create scalar iterator Pandas UDFs: +It is useful when the UDF execution requires initializing some states although internally it works +identically as Series to Series case. The pseudocode below illustrates the example. + +{% highlight python %} +@pandas_udf("long") +def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]: + # Do some expensive initialization with a state + state = very_expensive_initialization() + for x in iterator: + # Use that state for whole iterator. + yield calculate_with_state(x, state) + +df.select(calculate("value")).show() +{% endhighlight %} + +The following example shows how to create this Pandas UDF: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example scalar_iter_pandas_udf python/sql/arrow.py %} +{% include_example iter_ser_to_iter_ser_pandas_udf python/sql/arrow.py %} </div> </div> -### Grouped Map -Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern. -Split-apply-combine consists of three steps: -* Split the data into groups by using `DataFrame.groupBy`. -* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The - input data contains all the rows and columns for each group. -* Combine the results into a new `DataFrame`. +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) -To use `groupBy().apply()`, the user needs to define the following: -* A Python function that defines the computation for each group. -* A `StructType` object or a string that defines the schema of the output `DataFrame`. +### Iterator of Multiple Series to Iterator of Series -The column labels of the returned `pandas.DataFrame` must either match the field names in the -defined output schema if specified as strings, or match the field data types by position if not -strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) -on how to label columns when constructing a `pandas.DataFrame`. +The type hint can be expressed as `Iterator[Tuple[pandas.Series, ...]]` -> `Iterator[pandas.Series]`. -Note that all data for a group will be loaded into memory before the function is applied. This can -lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for -[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user -to ensure that the grouped data will fit into the available memory. +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF where the +given function takes an iterator of a tuple of multiple `pandas.Series` and outputs an iterator of `pandas.Series`. +In this case, the created pandas UDF requires multiple input columns as many as the series in the tuple +when the Pandas UDF is called. It works identically as Iterator of Series to Iterator of Series case except the parameter difference. -The following example shows how to use `groupby().apply()` to subtract the mean from each value in the group. +The following example shows how to create this Pandas UDF: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example grouped_map_pandas_udf python/sql/arrow.py %} +{% include_example iter_sers_to_iter_ser_pandas_udf python/sql/arrow.py %} </div> </div> -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and -[`pyspark.sql.GroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply). +For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) + +### Series to Scalar + +The type hint can be expressed as `pandas.Series`, ... -> `Any`. -### Grouped Aggregate +By using `pandas_udf` with the function having such type hints, it creates a Pandas UDF similar +to PySpark's aggregate functions. The given function takes `pandas.Series` and returns a scalar value. +The return type should be a primitive data type, and the returned scalar can be either a python +primitive type, e.g., `int` or `float` or a numpy data type, e.g., `numpy.int64` or `numpy.float64`. +`Any` should ideally be a specific scalar type accordingly. -Grouped aggregate Pandas UDFs are similar to Spark aggregate functions. Grouped aggregate Pandas UDFs are used with `groupBy().agg()` and -[`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). It defines an aggregation from one or more `pandas.Series` -to a scalar value, where each `pandas.Series` represents a column within the group or window. +This UDF can be also used with `groupBy().agg()` and [`pyspark.sql.Window`](api/python/pyspark.sql.html#pyspark.sql.Window). +It defines an aggregation from one or more `pandas.Series` to a scalar value, where each `pandas.Series` +represents a column within the group or window. -Note that this type of UDF does not support partial aggregation and all data for a group or window will be loaded into memory. Also, -only unbounded window is supported with Grouped aggregate Pandas UDFs currently. +Note that this type of UDF does not support partial aggregation and all data for a group or window +will be loaded into memory. Also, only unbounded window is supported with Grouped aggregate Pandas +UDFs currently. -The following example shows how to use this type of UDF to compute mean with groupBy and window operations: +The following example shows how to use this type of UDF to compute mean with a group-by and window operations: <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example grouped_agg_pandas_udf python/sql/arrow.py %} +{% include_example ser_to_scalar_pandas_udf python/sql/arrow.py %} </div> </div> For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) -### Map Iterator +## Pandas Function APIs -Map iterator Pandas UDFs are used to transform data with an iterator of batches. Map iterator -Pandas UDFs can be used with -[`pyspark.sql.DataFrame.mapInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas). -It defines a map function that transforms an iterator of `pandas.DataFrame` to another. +Pandas function APIs can directly apply a Python native function against the whole the DataFrame by +using Pandas instances. Internally it works similarly with Pandas UDFs by Spark using Arrow to transfer +data and Pandas to work with the data, which allows vectorized operations. A Pandas function API behaves +as a regular API under PySpark `DataFrame` in general. -It can return the output of arbitrary length in contrast to the scalar Pandas UDF. It maps an iterator of `pandas.DataFrame`s, -that represents the current `DataFrame`, using the map iterator UDF and returns the result as a `DataFrame`. +From Spark 3.0, Grouped map pandas UDF is now categorized as a separate Pandas Function API, +`DataFrame.groupby().applyInPandas()`. It is still possible to use it with `PandasUDFType` +and `DataFrame.groupby().apply()` as it was; however, it is preferred to use +`DataFrame.groupby().applyInPandas()` directly. Using `PandasUDFType` will be deprecated +in the future. -The following example shows how to create map iterator Pandas UDFs: +### Grouped Map + +Grouped map operations with Pandas instances are supported by `DataFrame.groupby().applyInPandas()` +which requires a Python function that takes a `pandas.DataFrame` and return another `pandas.DataFrame`. +It maps each group to each `pandas.DataFrame` in the Python function. + +This API implements the "split-apply-combine" pattern which consists of three steps: +* Split the data into groups by using `DataFrame.groupBy`. +* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The + input data contains all the rows and columns for each group. +* Combine the results into a new PySpark `DataFrame`. + +To use `groupBy().applyInPandas()`, the user needs to define the following: +* A Python function that defines the computation for each group. +* A `StructType` object or a string that defines the schema of the output PySpark `DataFrame`. + +The column labels of the returned `pandas.DataFrame` must either match the field names in the +defined output schema if specified as strings, or match the field data types by position if not +strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame) +on how to label columns when constructing a `pandas.DataFrame`. + +Note that all data for a group will be loaded into memory before the function is applied. This can +lead to out of memory exceptions, especially if the group sizes are skewed. The configuration for +[maxRecordsPerBatch](#setting-arrow-batch-size) is not applied on groups and it is up to the user +to ensure that the grouped data will fit into the available memory. + +The following example shows how to use `groupby().applyInPandas()` to subtract the mean from each value +in the group. <div class="codetabs"> <div data-lang="python" markdown="1"> -{% include_example map_iter_pandas_udf python/sql/arrow.py %} +{% include_example grouped_apply_in_pandas python/sql/arrow.py %} </div> </div> -For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and -[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas). +For detailed usage, please see [`pyspark.sql.GroupedData.applyInPandas`](api/python/pyspark.sql.html#pyspark.sql.GroupedData.applyInPandas). + +### Map +Map operations with Pandas instances are supported by `DataFrame.mapInPandas()` which maps an iterator +of `pandas.DataFrame`s to another iterator of `pandas.DataFrame`s that represents the current +PySpark `DataFrame` and returns the result as a PySpark `DataFrame`. The functions takes and outputs +an iterator of `pandas.DataFrame`. It can return the output of arbitrary length in contrast to some +Pandas UDFs although internally it works similarly with Series to Series Pandas UDF. + +The following example shows how to use `mapInPandas()`: + +<div class="codetabs"> +<div data-lang="python" markdown="1"> +{% include_example map_in_pandas python/sql/arrow.py %} +</div> +</div> -### Cogrouped Map +For detailed usage, please see [`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas). -Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to -each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps: +### Co-grouped Map +Co-grouped map operations with Pandas instances are supported by `DataFrame.cogroup().applyInPandas()` which Review comment: Is this `groupBy().cogroup().applyInPandas()`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
