dtenedor commented on code in PR #42420:
URL: https://github.com/apache/spark/pull/42420#discussion_r1296435244
##########
python/pyspark/worker.py:
##########
@@ -582,6 +583,56 @@ def read_udtf(pickleSer, infile, eval_type):
message_parameters={"method_name": "__init__", "error": str(e)},
)
+ """
+ This implements the logic of a UDTF that accepts an input TABLE argument
with one or more
+ PARTITION BY expressions.
+
+ Parameters
+ ----------
+ create_udtf: function
+ Function to create a new instance of the UDTF to be invoked.
+ partition_child_indexes: list
+ List of integers identifying zero-based indexes of the columns of the
input table that
+ contain projected partitioning expressions. This class will inspect
these values for each
+ pair of consecutive input rows. When they change, this indicates the
boundary between two
+ partitions, and we will invoke the 'terminate' method on the UDTF
class instance and then
+ destroy it and create a new one to implement the desired partitioning
semantics.
+ """
+ class UDTFWithPartitions:
Review Comment:
Sounds good, done.
##########
python/pyspark/worker.py:
##########
@@ -582,6 +583,56 @@ def read_udtf(pickleSer, infile, eval_type):
message_parameters={"method_name": "__init__", "error": str(e)},
)
+ """
+ This implements the logic of a UDTF that accepts an input TABLE argument
with one or more
+ PARTITION BY expressions.
+
+ Parameters
+ ----------
+ create_udtf: function
+ Function to create a new instance of the UDTF to be invoked.
+ partition_child_indexes: list
+ List of integers identifying zero-based indexes of the columns of the
input table that
+ contain projected partitioning expressions. This class will inspect
these values for each
+ pair of consecutive input rows. When they change, this indicates the
boundary between two
+ partitions, and we will invoke the 'terminate' method on the UDTF
class instance and then
+ destroy it and create a new one to implement the desired partitioning
semantics.
+ """
+ class UDTFWithPartitions:
+ def __init__(self, create_udtf, partition_child_indexes):
+ self._create_udtf = create_udtf
+ self._udtf = create_udtf()
+ self._prev_arguments = None
+ self._partition_child_indexes = partition_child_indexes
+
+ def eval(self, *args, **kwargs):
+ changed_partitions = _check_partition_boundaries(args +
list(kwargs.values()))
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]