This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8299600 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF 8299600 is described below commit 8299600575024ca81127b7bf8ef48ae11fdd0594 Author: Xiangrui Meng <m...@databricks.com> AuthorDate: Fri Jun 28 15:09:57 2019 -0700 [SPARK-28056][.2][PYTHON][SQL] add docstring/doctest for SCALAR_ITER Pandas UDF ## What changes were proposed in this pull request? Add docstring/doctest for `SCALAR_ITER` Pandas UDF. I explicitly mentioned that per-partition execution is an implementation detail, not guaranteed. I will submit another PR to add the same to user guide, just to keep this PR minimal. I didn't add "doctest: +SKIP" in the first commit so it is easy to test locally. cc: HyukjinKwon gatorsmile icexelloss BryanCutler WeichenXu123 ![Screen Shot 2019-06-28 at 9 52 41 AM](https://user-images.githubusercontent.com/829644/60358349-b0aa5400-998a-11e9-9ebf-8481dfd555b5.png) ![Screen Shot 2019-06-28 at 9 53 19 AM](https://user-images.githubusercontent.com/829644/60358355-b1db8100-998a-11e9-8f6f-00a11bdbdc4d.png) ## How was this patch tested? doctest Closes #25005 from mengxr/SPARK-28056.2. Authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- python/pyspark/sql/functions.py | 104 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 102 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 34f6593..5d1e69e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2951,7 +2951,107 @@ def pandas_udf(f=None, returnType=None, functionType=None): Therefore, this can be used, for example, to ensure the length of each returned `pandas.Series`, and can not be used as the column length. - 2. GROUPED_MAP + 2. SCALAR_ITER + + A scalar iterator UDF is semantically the same as the scalar Pandas UDF above except that the + wrapped Python function takes an iterator of batches as input instead of a single batch and, + instead of returning a single output batch, it yields output batches or explicitly returns an + generator or an iterator of output batches. + It is useful when the UDF execution requires initializing some state, e.g., loading a machine + learning model file to apply inference to every input batch. + + .. note:: It is not guaranteed that one invocation of a scalar iterator UDF will process all + batches from one partition, although it is currently implemented this way. + Your code shall not rely on this behavior because it might change in the future for + further optimization, e.g., one invocation processes multiple partitions. + + Scalar iterator UDFs are used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + + >>> import pandas as pd # doctest: +SKIP + >>> from pyspark.sql.functions import col, pandas_udf, struct, PandasUDFType + >>> pdf = pd.DataFrame([1, 2, 3], columns=["x"]) # doctest: +SKIP + >>> df = spark.createDataFrame(pdf) # doctest: +SKIP + + When the UDF is called with a single column that is not `StructType`, the input to the + underlying function is an iterator of `pd.Series`. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def plus_one(batch_iter): + ... for x in batch_iter: + ... yield x + 1 + ... + >>> df.select(plus_one(col("x"))).show() # doctest: +SKIP + +-----------+ + |plus_one(x)| + +-----------+ + | 2| + | 3| + | 4| + +-----------+ + + When the UDF is called with more than one columns, the input to the underlying function is an + iterator of `pd.Series` tuple. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def multiply_two_cols(batch_iter): + ... for a, b in batch_iter: + ... yield a * b + ... + >>> df.select(multiply_two_cols(col("x"), col("x"))).show() # doctest: +SKIP + +-----------------------+ + |multiply_two_cols(x, x)| + +-----------------------+ + | 1| + | 4| + | 9| + +-----------------------+ + + When the UDF is called with a single column that is `StructType`, the input to the underlying + function is an iterator of `pd.DataFrame`. + + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def multiply_two_nested_cols(pdf_iter): + ... for pdf in pdf_iter: + ... yield pdf["a"] * pdf["b"] + ... + >>> df.select( + ... multiply_two_nested_cols( + ... struct(col("x").alias("a"), col("x").alias("b")) + ... ).alias("y") + ... ).show() # doctest: +SKIP + +---+ + | y| + +---+ + | 1| + | 4| + | 9| + +---+ + + In the UDF, you can initialize some states before processing batches, wrap your code with + `try ... finally ...` or use context managers to ensure the release of resources at the end + or in case of early termination. + + >>> y_bc = spark.sparkContext.broadcast(1) # doctest: +SKIP + >>> @pandas_udf("long", PandasUDFType.SCALAR_ITER) # doctest: +SKIP + ... def plus_y(batch_iter): + ... y = y_bc.value # initialize some state + ... try: + ... for x in batch_iter: + ... yield x + y + ... finally: + ... pass # release resources here, if any + ... + >>> df.select(plus_y(col("x"))).show() # doctest: +SKIP + +---------+ + |plus_y(x)| + +---------+ + | 2| + | 3| + | 4| + +---------+ + + 3. GROUPED_MAP A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame` The returnType should be a :class:`StructType` describing the schema of the returned @@ -3030,7 +3130,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): .. seealso:: :meth:`pyspark.sql.GroupedData.apply` - 3. GROUPED_AGG + 4. GROUPED_AGG A grouped aggregate UDF defines a transformation: One or more `pandas.Series` -> A scalar The `returnType` should be a primitive data type, e.g., :class:`DoubleType`. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org