cloud-fan commented on a change in pull request #27466: 
[SPARK-30722][PYTHON][DOCS] Update documentation for Pandas UDF with Python 
type hints
URL: https://github.com/apache/spark/pull/27466#discussion_r377025782
 
 

 ##########
 File path: python/pyspark/sql/pandas/functions.py
 ##########
 @@ -43,303 +43,228 @@ class PandasUDFType(object):
 @since(2.3)
 def pandas_udf(f=None, returnType=None, functionType=None):
     """
-    Creates a vectorized user defined function (UDF).
+    Creates a pandas user defined function (a.k.a. vectorized user defined 
function).
+
+    Pandas UDFs are user defined functions that are executed by Spark using 
Arrow to transfer
+    data and Pandas to work with the data, which allows vectorized operations. 
A Pandas UDF
+    is defined using the `pandas_udf` as a decorator or to wrap the function, 
and no
+    additional configuration is required. A Pandas UDF behaves as a regular 
PySpark function
+    API in general.
 
     :param f: user-defined function. A python function if used as a standalone 
function
     :param returnType: the return type of the user-defined function. The value 
can be either a
         :class:`pyspark.sql.types.DataType` object or a DDL-formatted type 
string.
     :param functionType: an enum value in 
:class:`pyspark.sql.functions.PandasUDFType`.
-                         Default: SCALAR.
-
-    .. seealso:: :meth:`pyspark.sql.DataFrame.mapInPandas`
-    .. seealso:: :meth:`pyspark.sql.GroupedData.applyInPandas`
-    .. seealso:: :meth:`pyspark.sql.PandasCogroupedOps.applyInPandas`
-
-    The function type of the UDF can be one of the following:
-
-    1. SCALAR
-
-       A scalar UDF defines a transformation: One or more `pandas.Series` -> A 
`pandas.Series`.
-       The length of the returned `pandas.Series` must be of the same as the 
input `pandas.Series`.
-       If the return type is :class:`StructType`, the returned value should be 
a `pandas.DataFrame`.
-
-       :class:`MapType`, nested :class:`StructType` are currently not 
supported as output types.
-
-       Scalar UDFs can be used with :meth:`pyspark.sql.DataFrame.withColumn` 
and
-       :meth:`pyspark.sql.DataFrame.select`.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> from pyspark.sql.types import IntegerType, StringType
-       >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())  # doctest: 
+SKIP
-       >>> @pandas_udf(StringType())  # doctest: +SKIP
-       ... def to_upper(s):
-       ...     return s.str.upper()
-       ...
-       >>> @pandas_udf("integer", PandasUDFType.SCALAR)  # doctest: +SKIP
-       ... def add_one(x):
-       ...     return x + 1
-       ...
-       >>> df = spark.createDataFrame([(1, "John Doe", 21)],
-       ...                            ("id", "name", "age"))  # doctest: +SKIP
-       >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-       ...     .show()  # doctest: +SKIP
-       +----------+--------------+------------+
-       |slen(name)|to_upper(name)|add_one(age)|
-       +----------+--------------+------------+
-       |         8|      JOHN DOE|          22|
-       +----------+--------------+------------+
-       >>> @pandas_udf("first string, last string")  # doctest: +SKIP
-       ... def split_expand(n):
-       ...     return n.str.split(expand=True)
-       >>> df.select(split_expand("name")).show()  # doctest: +SKIP
-       +------------------+
-       |split_expand(name)|
-       +------------------+
-       |       [John, Doe]|
-       +------------------+
-
-       .. note:: The length of `pandas.Series` within a scalar UDF is not that 
of the whole input
-           column, but is the length of an internal batch used for each call 
to the function.
-           Therefore, this can be used, for example, to ensure the length of 
each returned
-           `pandas.Series`, and can not be used as the column length.
-
-    2. SCALAR_ITER
-
-       A scalar iterator UDF is semantically the same as the scalar Pandas UDF 
above except that the
-       wrapped Python function takes an iterator of batches as input instead 
of a single batch and,
-       instead of returning a single output batch, it yields output batches or 
explicitly returns an
-       generator or an iterator of output batches.
-       It is useful when the UDF execution requires initializing some state, 
e.g., loading a machine
-       learning model file to apply inference to every input batch.
-
-       .. note:: It is not guaranteed that one invocation of a scalar iterator 
UDF will process all
-           batches from one partition, although it is currently implemented 
this way.
-           Your code shall not rely on this behavior because it might change 
in the future for
-           further optimization, e.g., one invocation processes multiple 
partitions.
-
-       Scalar iterator UDFs are used with 
:meth:`pyspark.sql.DataFrame.withColumn` and
-       :meth:`pyspark.sql.DataFrame.select`.
-
-       >>> import pandas as pd  # doctest: +SKIP
-       >>> from pyspark.sql.functions import col, pandas_udf, struct, 
PandasUDFType
-       >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"])  # doctest: +SKIP
-       >>> df = spark.createDataFrame(pdf)  # doctest: +SKIP
-
-       When the UDF is called with a single column that is not `StructType`, 
the input to the
-       underlying function is an iterator of `pd.Series`.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def plus_one(batch_iter):
-       ...     for x in batch_iter:
-       ...         yield x + 1
-       ...
-       >>> df.select(plus_one(col("x"))).show()  # doctest: +SKIP
-       +-----------+
-       |plus_one(x)|
-       +-----------+
-       |          2|
-       |          3|
-       |          4|
-       +-----------+
-
-       When the UDF is called with more than one columns, the input to the 
underlying function is an
-       iterator of `pd.Series` tuple.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def multiply_two_cols(batch_iter):
-       ...     for a, b in batch_iter:
-       ...         yield a * b
-       ...
-       >>> df.select(multiply_two_cols(col("x"), col("x"))).show()  # doctest: 
+SKIP
-       +-----------------------+
-       |multiply_two_cols(x, x)|
-       +-----------------------+
-       |                      1|
-       |                      4|
-       |                      9|
-       +-----------------------+
-
-       When the UDF is called with a single column that is `StructType`, the 
input to the underlying
-       function is an iterator of `pd.DataFrame`.
-
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def multiply_two_nested_cols(pdf_iter):
-       ...    for pdf in pdf_iter:
-       ...        yield pdf["a"] * pdf["b"]
-       ...
-       >>> df.select(
-       ...     multiply_two_nested_cols(
-       ...         struct(col("x").alias("a"), col("x").alias("b"))
-       ...     ).alias("y")
-       ... ).show()  # doctest: +SKIP
-       +---+
-       |  y|
-       +---+
-       |  1|
-       |  4|
-       |  9|
-       +---+
-
-       In the UDF, you can initialize some states before processing batches, 
wrap your code with
-       `try ... finally ...` or use context managers to ensure the release of 
resources at the end
-       or in case of early termination.
-
-       >>> y_bc = spark.sparkContext.broadcast(1)  # doctest: +SKIP
-       >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER)  # doctest: +SKIP
-       ... def plus_y(batch_iter):
-       ...     y = y_bc.value  # initialize some state
-       ...     try:
-       ...         for x in batch_iter:
-       ...             yield x + y
-       ...     finally:
-       ...         pass  # release resources here, if any
-       ...
-       >>> df.select(plus_y(col("x"))).show()  # doctest: +SKIP
-       +---------+
-       |plus_y(x)|
-       +---------+
-       |        2|
-       |        3|
-       |        4|
-       +---------+
-
-    3. GROUPED_MAP
-
-       A grouped map UDF defines transformation: A `pandas.DataFrame` -> A 
`pandas.DataFrame`
-       The returnType should be a :class:`StructType` describing the schema of 
the returned
-       `pandas.DataFrame`. The column labels of the returned 
`pandas.DataFrame` must either match
-       the field names in the defined returnType schema if specified as 
strings, or match the
-       field data types by position if not strings, e.g. integer indices.
-       The length of the returned `pandas.DataFrame` can be arbitrary.
-
-       Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # 
doctest: +SKIP
-       ... def normalize(pdf):
-       ...     v = pdf.v
-       ...     return pdf.assign(v=(v - v.mean()) / v.std())
-       >>> df.groupby("id").apply(normalize).show()  # doctest: +SKIP
-       +---+-------------------+
-       | id|                  v|
-       +---+-------------------+
-       |  1|-0.7071067811865475|
-       |  1| 0.7071067811865475|
-       |  2|-0.8320502943378437|
-       |  2|-0.2773500981126146|
-       |  2| 1.1094003924504583|
-       +---+-------------------+
-
-       Alternatively, the user can define a function that takes two arguments.
-       In this case, the grouping key(s) will be passed as the first argument 
and the data will
-       be passed as the second argument. The grouping key(s) will be passed as 
a tuple of numpy
-       data types, e.g., `numpy.int32` and `numpy.float64`. The data will 
still be passed in
-       as a `pandas.DataFrame` containing all columns from the original Spark 
DataFrame.
-       This is useful when the user does not want to hardcode grouping key(s) 
in the function.
-
-       >>> import pandas as pd  # doctest: +SKIP
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))  # doctest: +SKIP
-       >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)  # 
doctest: +SKIP
-       ... def mean_udf(key, pdf):
-       ...     # key is a tuple of one numpy.int64, which is the value
-       ...     # of 'id' for the current group
-       ...     return pd.DataFrame([key + (pdf.v.mean(),)])
-       >>> df.groupby('id').apply(mean_udf).show()  # doctest: +SKIP
-       +---+---+
-       | id|  v|
-       +---+---+
-       |  1|1.5|
-       |  2|6.0|
-       +---+---+
-       >>> @pandas_udf(
-       ...    "id long, `ceil(v / 2)` long, v double",
-       ...    PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
-       >>> def sum_udf(key, pdf):
-       ...     # key is a tuple of two numpy.int64s, which is the values
-       ...     # of 'id' and 'ceil(df.v / 2)' for the current group
-       ...     return pd.DataFrame([key + (pdf.v.sum(),)])
-       >>> df.groupby(df.id, ceil(df.v / 2)).apply(sum_udf).show()  # doctest: 
+SKIP
-       +---+-----------+----+
-       | id|ceil(v / 2)|   v|
-       +---+-----------+----+
-       |  2|          5|10.0|
-       |  1|          1| 3.0|
-       |  2|          3| 5.0|
-       |  2|          2| 3.0|
-       +---+-----------+----+
-
-       .. note:: If returning a new `pandas.DataFrame` constructed with a 
dictionary, it is
-           recommended to explicitly index the columns by name to ensure the 
positions are correct,
-           or alternatively use an `OrderedDict`.
-           For example, `pd.DataFrame({'id': ids, 'a': data}, columns=['id', 
'a'])` or
-           `pd.DataFrame(OrderedDict([('id', ids), ('a', data)]))`.
-
-       .. seealso:: :meth:`pyspark.sql.GroupedData.apply`
-
-    4. GROUPED_AGG
-
-       A grouped aggregate UDF defines a transformation: One or more 
`pandas.Series` -> A scalar
-       The `returnType` should be a primitive data type, e.g., 
:class:`DoubleType`.
-       The returned scalar can be either a python primitive type, e.g., `int` 
or `float`
-       or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.
-
-       :class:`MapType` and :class:`StructType` are currently not supported as 
output types.
-
-       Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` 
and
-       :class:`pyspark.sql.Window`
-
-       This example shows using grouped aggregated UDFs with groupby:
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))
-       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
-       ... def mean_udf(v):
-       ...     return v.mean()
-       >>> df.groupby("id").agg(mean_udf(df['v'])).show()  # doctest: +SKIP
-       +---+-----------+
-       | id|mean_udf(v)|
-       +---+-----------+
-       |  1|        1.5|
-       |  2|        6.0|
-       +---+-----------+
-
-       This example shows using grouped aggregated UDFs as window functions.
-
-       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
-       >>> from pyspark.sql import Window
-       >>> df = spark.createDataFrame(
-       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
-       ...     ("id", "v"))
-       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)  # doctest: +SKIP
-       ... def mean_udf(v):
-       ...     return v.mean()
-       >>> w = (Window.partitionBy('id')
-       ...            .orderBy('v')
-       ...            .rowsBetween(-1, 0))
-       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()  # 
doctest: +SKIP
-       +---+----+------+
-       | id|   v|mean_v|
-       +---+----+------+
-       |  1| 1.0|   1.0|
-       |  1| 2.0|   1.5|
-       |  2| 3.0|   3.0|
-       |  2| 5.0|   4.0|
-       |  2|10.0|   7.5|
-       +---+----+------+
-
-       .. note:: For performance reasons, the input series to window functions 
are not copied.
+        Default: SCALAR.
+
+        .. note:: This parameter exists for compatibility. Using Python type 
hints is encouraged.
+
+    In order to use this API, customarily the below are imported:
+
+    >>> import pandas as pd
+    >>> from pyspark.sql.functions import pandas_udf
+
+    Prior to Spark 3.0, the pandas UDF used `functionType` to decide the 
execution type as below:
 
 Review comment:
   shall we introduce the new API first and legacy API later to promote the new 
API?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to