ueshin commented on code in PR #42420:
URL: https://github.com/apache/spark/pull/42420#discussion_r1295267414


##########
python/pyspark/worker.py:
##########
@@ -582,6 +583,56 @@ def read_udtf(pickleSer, infile, eval_type):
             message_parameters={"method_name": "__init__", "error": str(e)},
         )
 
+    """
+    This implements the logic of a UDTF that accepts an input TABLE argument 
with one or more
+    PARTITION BY expressions.
+
+    Parameters
+    ----------
+    create_udtf: function
+        Function to create a new instance of the UDTF to be invoked.
+    partition_child_indexes: list
+        List of integers identifying zero-based indexes of the columns of the 
input table that
+        contain projected partitioning expressions. This class will inspect 
these values for each
+        pair of consecutive input rows. When they change, this indicates the 
boundary between two
+        partitions, and we will invoke the 'terminate' method on the UDTF 
class instance and then
+        destroy it and create a new one to implement the desired partitioning 
semantics.
+    """
+    class UDTFWithPartitions:
+        def __init__(self, create_udtf, partition_child_indexes):
+            self._create_udtf = create_udtf
+            self._udtf = create_udtf()
+            self._prev_arguments = None
+            self._partition_child_indexes = partition_child_indexes
+
+        def eval(self, *args, **kwargs):
+            changed_partitions = _check_partition_boundaries(args + 
list(kwargs.values()))

Review Comment:
   `self._check_partition_boundaries( ... )`?



##########
python/pyspark/worker.py:
##########
@@ -582,6 +583,56 @@ def read_udtf(pickleSer, infile, eval_type):
             message_parameters={"method_name": "__init__", "error": str(e)},
         )
 
+    """
+    This implements the logic of a UDTF that accepts an input TABLE argument 
with one or more
+    PARTITION BY expressions.
+
+    Parameters
+    ----------
+    create_udtf: function
+        Function to create a new instance of the UDTF to be invoked.
+    partition_child_indexes: list
+        List of integers identifying zero-based indexes of the columns of the 
input table that
+        contain projected partitioning expressions. This class will inspect 
these values for each
+        pair of consecutive input rows. When they change, this indicates the 
boundary between two
+        partitions, and we will invoke the 'terminate' method on the UDTF 
class instance and then
+        destroy it and create a new one to implement the desired partitioning 
semantics.
+    """
+    class UDTFWithPartitions:

Review Comment:
   nit: In Python, a class comment is placed after `class Xxxx:`:
   
   ```py
   class UDTFWithPartitions:
       """
       This implements ...
       """
   
       def __init__( ... ):
   ...
   ```



##########
python/pyspark/worker.py:
##########
@@ -582,6 +583,56 @@ def read_udtf(pickleSer, infile, eval_type):
             message_parameters={"method_name": "__init__", "error": str(e)},
         )
 
+    """
+    This implements the logic of a UDTF that accepts an input TABLE argument 
with one or more
+    PARTITION BY expressions.
+
+    Parameters
+    ----------
+    create_udtf: function
+        Function to create a new instance of the UDTF to be invoked.
+    partition_child_indexes: list
+        List of integers identifying zero-based indexes of the columns of the 
input table that
+        contain projected partitioning expressions. This class will inspect 
these values for each
+        pair of consecutive input rows. When they change, this indicates the 
boundary between two
+        partitions, and we will invoke the 'terminate' method on the UDTF 
class instance and then
+        destroy it and create a new one to implement the desired partitioning 
semantics.
+    """
+    class UDTFWithPartitions:
+        def __init__(self, create_udtf, partition_child_indexes):
+            self._create_udtf = create_udtf
+            self._udtf = create_udtf()
+            self._prev_arguments = None
+            self._partition_child_indexes = partition_child_indexes
+
+        def eval(self, *args, **kwargs):
+            changed_partitions = _check_partition_boundaries(args + 
list(kwargs.values()))
+            if changed_partitions:
+                yield self._udtf.terminate()

Review Comment:
   Need to check whether `self._udtf` has `terminate`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to