allisonwang-db commented on code in PR #42272:
URL: https://github.com/apache/spark/pull/42272#discussion_r1310942775


##########
examples/src/main/python/sql/udtf.py:
##########
@@ -0,0 +1,230 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A simple example demonstrating Python UDTFs in Spark
+Run with:
+  ./bin/spark-submit examples/src/main/python/sql/udtf.py
+"""
+
+# NOTE that this file is imported in the User Guides in PySpark documentation.
+# The codes are referred via line numbers. See also `literalinclude` directive 
in Sphinx.
+from pyspark.sql import SparkSession
+from pyspark.sql.pandas.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
+
+# Python UDTFs use Arrow by default.
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
+
+
+def python_udtf_simple_example(spark: SparkSession) -> None:
+
+    # Define the UDTF class and implement the required `eval` method.
+    class SquareNumbers:
+        def eval(self, start: int, end: int):
+            for num in range(start, end + 1):
+                yield (num, num * num)
+
+    from pyspark.sql.functions import lit, udtf
+
+    # Create a UDTF using the class definition and the `udtf` function.
+    square_num = udtf(SquareNumbers, returnType="num: int, squared: int")
+
+    # Invoke the UDTF in PySpark.
+    square_num(lit(1), lit(3)).show()
+    # +---+-------+
+    # |num|squared|
+    # +---+-------+
+    # |  1|      1|
+    # |  2|      4|
+    # |  3|      9|
+    # +---+-------+
+
+
+def python_udtf_decorator_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import lit, udtf
+
+    # Define a UDTF using the `udtf` decorator directly on the class.
+    @udtf(returnType="num: int, squared: int")
+    class SquareNumbers:
+        def eval(self, start: int, end: int):
+            for num in range(start, end + 1):
+                yield (num, num * num)
+
+    # Invoke the UDTF in PySpark using the SquareNumbers class directly.
+    SquareNumbers(lit(1), lit(3)).show()
+    # +---+-------+
+    # |num|squared|
+    # +---+-------+
+    # |  1|      1|
+    # |  2|      4|
+    # |  3|      9|
+    # +---+-------+
+
+
+def python_udtf_registration(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="word: string")
+    class WordSplitter:
+        def eval(self, text: str):
+            for word in text.split(" "):
+                yield (word.strip(),)
+
+    # Register the UDTF for use in Spark SQL.
+    spark.udtf.register("split_words", WordSplitter)
+
+    # Example: Using the UDTF in SQL.
+    spark.sql("SELECT * FROM split_words('hello world')").show()
+    # +-----+
+    # | word|
+    # +-----+
+    # |hello|
+    # |world|
+    # +-----+
+
+    # Example: Using the UDTF with a lateral join in SQL.
+    # The lateral join allows us to reference the columns and aliases
+    # in the previous FROM clause items as inputs to the UDTF.
+    spark.sql(
+        "SELECT * FROM VALUES ('Hello World'), ('Apache Spark') t(text), "
+        "LATERAL split_words(text)"
+    ).show()
+    # +------------+------+
+    # |        text|  word|
+    # +------------+------+
+    # | Hello World| Hello|
+    # | Hello World| World|
+    # |Apache Spark|Apache|
+    # |Apache Spark| Spark|
+    # +------------+------+
+
+
+def python_udtf_arrow_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="c1: int, c2: int", useArrow=True)
+    class PlusOne:
+        def eval(self, x: int):
+            yield x, x + 1
+
+
+def python_udtf_date_expander_example(spark: SparkSession) -> None:
+
+    from datetime import datetime, timedelta
+    from pyspark.sql.functions import lit, udtf
+
+    @udtf(returnType="date: string")
+    class DateExpander:
+        def eval(self, start_date: str, end_date: str):
+            current = datetime.strptime(start_date, '%Y-%m-%d')
+            end = datetime.strptime(end_date, '%Y-%m-%d')
+            while current <= end:
+                yield (current.strftime('%Y-%m-%d'),)
+                current += timedelta(days=1)
+
+    DateExpander(lit("2023-02-25"), lit("2023-03-01")).show()
+    # +----------+
+    # |      date|
+    # +----------+
+    # |2023-02-25|
+    # |2023-02-26|
+    # |2023-02-27|
+    # |2023-02-28|
+    # |2023-03-01|
+    # +----------+
+
+
+def python_udtf_terminate_example(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+
+    @udtf(returnType="cnt: int")
+    class CountUDTF:
+        def __init__(self):
+            self.count = 0
+
+        def eval(self, x: int):
+            self.count += 1
+
+        def terminate(self):
+            yield self.count,
+
+    spark.udtf.register("count_udtf", CountUDTF)
+    spark.sql("SELECT * FROM range(0, 10, 1, 1), LATERAL 
count_udtf(id)").show()
+    # +---+---+
+    # | id|cnt|
+    # +---+---+
+    # |  9| 10|
+    # +---+---+
+
+
+def python_udtf_table_argument(spark: SparkSession) -> None:
+
+    from pyspark.sql.functions import udtf
+    from pyspark.sql.types import Row
+
+    @udtf(returnType="id: int")
+    class FilterUDTF:
+        def eval(self, row: Row):
+            if row["id"] > 5:
+                yield row["id"],
+
+    spark.udtf.register("filter_udtf", FilterUDTF)
+
+    spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM 
range(10)))").show()

Review Comment:
   We can follow up in 
[SPARK-44746](https://issues.apache.org/jira/browse/SPARK-44746)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to