allisonwang-db commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1347981218
##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2309,6 +2309,55 @@ def terminate(self):
+ [Row(partition_col=42, count=3, total=3, last=None)],
)
+ def test_udtf_with_prepare_string_from_analyze(self):
+ @udtf
+ class TestUDTF:
+ def __init__(self):
+ self._total = 0
+ self._buffer = None
+
+ @staticmethod
+ def analyze(argument, _):
+ if (
+ argument.value is None
+ or argument.is_table
+ or not isinstance(argument.value, str)
+ or len(argument.value) == 0
+ ):
+ raise Exception("The first argument must be non-empty
string")
+ assert argument.data_type == StringType()
+ assert not argument.is_table
+ return AnalyzeResult(
+ schema=StructType().add("total",
IntegerType()).add("buffer", StringType()),
+ prepare_buffer=argument.value,
+ with_single_partition=True,
+ )
+
+ def prepare(self, buffer):
Review Comment:
We want to pass in some string values returned from the `analyze` method
when initiating the UDTF. Should this be part of the `__init__` method of the
UDTF?
##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
If non-empty, this is a sequence of columns that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partition_by' list
must also be non-empty
in this case.
+ prepare_buffer: str
+ If non-empty, this string represents state computed once within the
'analyze' method to be
+ propagated to each instance of the UDTF class at the time of its
creation, using its
+ 'prepare' method. The format this buffer is opaque and known only to
the data source. Common
+ use cases include serializing protocol buffers or JSON configurations
into this buffer so
+ that potentially expensive initialization work done at 'analyze' time
does not need to be
+ recomputed later.
"""
schema: StructType
with_single_partition: bool = False
partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+ prepare_buffer: str = ""
Review Comment:
Hmm I am not sure where this should be introduced. Currently, the
`AnalyzeResult` represents some logical planning information such as output
schema, ordering, and partitioning, and it's a bit confusing to me how to use
this buffer (maybe we should change its name..)
##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
If non-empty, this is a sequence of columns that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partition_by' list
must also be non-empty
in this case.
+ prepare_buffer: str
Review Comment:
I wonder why it's a string type. Anything that can be pickled should be fine
right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]