ueshin commented on code in PR #43682:
URL: https://github.com/apache/spark/pull/43682#discussion_r1391800182
##########
python/docs/source/user_guide/sql/python_udtf.rst:
##########
@@ -101,20 +101,29 @@ To implement a Python UDTF, you first need to define a
class implementing the me
partitionBy: Sequence[PartitioningColumn] =
field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] =
field(default_factory=tuple)
+ Notes
+ -----
+ - It is possible for the `analyze` method to accept the exact
arguments expected,
+ mapping 1:1 with the arguments provided to the UDTF call.
+ - The `analyze` method can instead choose ot accept positional
arguments if desired
+ (using `*args`) or keyword arguments (using `**kwargs`).
+
Examples
--------
- analyze implementation that returns one output column for each
word in the input string
- argument.
+ This is an `analyze` implementation that returns one output column
for each word in the
+ input string argument.
- >>> def analyze(self, text: str) -> AnalyzeResult:
+ >>> @staticmethod
+ ... def analyze(text: str) -> AnalyzeResult:
Review Comment:
Thanks for the fix! 👍
##########
python/docs/source/user_guide/sql/python_udtf.rst:
##########
@@ -101,20 +101,29 @@ To implement a Python UDTF, you first need to define a
class implementing the me
partitionBy: Sequence[PartitioningColumn] =
field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] =
field(default_factory=tuple)
+ Notes
+ -----
+ - It is possible for the `analyze` method to accept the exact
arguments expected,
+ mapping 1:1 with the arguments provided to the UDTF call.
+ - The `analyze` method can instead choose ot accept positional
arguments if desired
+ (using `*args`) or keyword arguments (using `**kwargs`).
+
Examples
--------
- analyze implementation that returns one output column for each
word in the input string
- argument.
+ This is an `analyze` implementation that returns one output column
for each word in the
+ input string argument.
- >>> def analyze(self, text: str) -> AnalyzeResult:
+ >>> @staticmethod
+ ... def analyze(text: str) -> AnalyzeResult:
Review Comment:
Thanks for the fix! 👍
##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2467,6 +2468,53 @@ def terminate(self):
[Row(count=20, buffer="abc")],
)
+ def test_udtf_with_skip_rest_of_input_table_exception(self):
+ @udtf(returnType="total: int")
+ class TestUDTF:
+ def __init__(self):
+ self._total = 0
+
+ def eval(self, _: Row):
+ self._total += 1
+ if self._total >= 4:
+ raise SkipRestOfInputTableException("Stop at self._total
>= 4")
+
+ def terminate(self):
+ yield self._total,
+
+ self.spark.udtf.register("test_udtf", TestUDTF)
+
+ # Run a test case including WITH SINGLE PARTITION on the UDTF call. The
+ # SkipRestOfInputTableException stops scanning rows after the fourth
input row is consumed.
+ assertDataFrameEqual(
+ self.spark.sql(
+ """
+ WITH t AS (
+ SELECT id FROM range(1, 21)
+ )
+ SELECT total
+ FROM test_udtf(TABLE(t) WITH SINGLE PARTITION)
+ """
+ ),
+ [Row(total=4)],
+ )
+ # Run a test case including WITH SINGLE PARTITION on the UDTF call. The
+ # SkipRestOfInputTableException stops scanning rows for each of the
two partitions
+ # separately.
+ assertDataFrameEqual(
+ self.spark.sql(
+ """
+ WITH t AS (
+ SELECT id FROM range(1, 21)
+ )
+ SELECT id / 10 AS id_divided_by_ten, total
+ FROM test_udtf(TABLE(t) PARTITION BY id / 10)
+ ORDER BY ALL
+ """
+ ),
+ [Row(id_divided_by_ten=0, total=4), Row(id_divided_by_ten=1,
total=4)],
Review Comment:
For this case, the expected results should be:
```py
[Row(id_divided_by_ten=0, total=4), Row(id_divided_by_ten=1, total=4),
Row(id_divided_by_ten=2, total=1)]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]