allisonwang-db commented on code in PR #43204:
URL: https://github.com/apache/spark/pull/43204#discussion_r1347981218


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2309,6 +2309,55 @@ def terminate(self):
             + [Row(partition_col=42, count=3, total=3, last=None)],
         )
 
+    def test_udtf_with_prepare_string_from_analyze(self):
+        @udtf
+        class TestUDTF:
+            def __init__(self):
+                self._total = 0
+                self._buffer = None
+
+            @staticmethod
+            def analyze(argument, _):
+                if (
+                    argument.value is None
+                    or argument.is_table
+                    or not isinstance(argument.value, str)
+                    or len(argument.value) == 0
+                ):
+                    raise Exception("The first argument must be non-empty 
string")
+                assert argument.data_type == StringType()
+                assert not argument.is_table
+                return AnalyzeResult(
+                    schema=StructType().add("total", 
IntegerType()).add("buffer", StringType()),
+                    prepare_buffer=argument.value,
+                    with_single_partition=True,
+                )
+
+            def prepare(self, buffer):

Review Comment:
   We want to pass in some string values returned from the `analyze` method 
when initiating the UDTF. Should this be part of the `__init__`  method of the 
UDTF?



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list 
must also be non-empty
         in this case.
+    prepare_buffer: str
+        If non-empty, this string represents state computed once within the 
'analyze' method to be
+        propagated to each instance of the UDTF class at the time of its 
creation, using its
+        'prepare' method. The format this buffer is opaque and known only to 
the data source. Common
+        use cases include serializing protocol buffers or JSON configurations 
into this buffer so
+        that potentially expensive initialization work done at 'analyze' time 
does not need to be
+        recomputed later.
     """
 
     schema: StructType
     with_single_partition: bool = False
     partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
     order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
+    prepare_buffer: str = ""

Review Comment:
   Hmm I am not sure where this should be introduced. Currently, the 
`AnalyzeResult` represents some logical planning information such as output 
schema, ordering, and partitioning, and it's a bit confusing to me how to use 
this buffer (maybe we should change its name..)



##########
python/pyspark/sql/udtf.py:
##########
@@ -107,12 +107,20 @@ class AnalyzeResult:
         If non-empty, this is a sequence of columns that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partition_by' list 
must also be non-empty
         in this case.
+    prepare_buffer: str

Review Comment:
   I wonder why it's a string type. Anything that can be pickled should be fine 
right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to