Yicong-Huang commented on code in PR #54172:
URL: https://github.com/apache/spark/pull/54172#discussion_r2795266379


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -135,27 +136,38 @@ def load_stream(self, stream):
         for batch in reader:
             yield batch
 
-    def _load_group_dataframes(self, stream, num_dfs: int = 1):
-        """
-        Load groups with specified number of dataframes from stream.
+    def __repr__(self):
+        return "ArrowStreamSerializer"
 
-        For num_dfs=1, yields a single-element tuple containing a lazy 
iterator.
-        For num_dfs>1, yields a tuple of eagerly loaded lists to ensure correct
-        stream position when reading multiple dataframes sequentially.
 
-        Parameters
-        ----------
-        stream
-            The input stream to read from
-        num_dfs : int
-            The expected number of dataframes in each group (e.g., 1 for 
grouped UDFs,
-            2 for cogrouped UDFs)
+class ArrowStreamGroupSerializer(ArrowStreamSerializer):
+    """
+    Configurable Arrow stream serializer for UDF execution.
 
-        Yields
-        ------
-        tuple
-            For num_dfs=1: tuple[Iterator[pa.RecordBatch]]
-            For num_dfs>1: tuple[list[pa.RecordBatch], ...]
+    Intended as the single base class that all UDFs will use.
+
+    Parameters
+    ----------
+    num_dfs : int
+        Number of dataframes per group.
+        For num_dfs=0, plain batch stream without group-count protocol.

Review Comment:
     Thanks for the feedback! You raised a good point about the semantics of 
`num_dfs=0` in `ArrowStreamGroupSerializer`.
   
     The reason I designed it this way is to provide a unified, configurable 
base serializer that can support various combinations of features:
     - `num_dfs=0`: plain batch stream (no group-count protocol)
     -` num_dfs=1/2`: grouped/cogrouped stream (with group-count protocol)
     - `write_start_stream=True/False`: with or without START_ARROW_STREAM 
marker
   
     This design allows for flexibility in the future. For example, we might 
need a serializer that supports both grouping AND the start stream marker 
(e.g., num_dfs=1, write_start_stream=True). With this unified approach, we can 
handle all combinations without creating  multiple specialized serializer 
classes.
   
     While the name "GroupSerializer" might suggest it's only for grouped data, 
I view `num_dfs=0` as a degenerate case that disables the grouping feature. The 
primary use case is indeed for grouped UDFs (`num_dfs=1/2`), but supporting 
`num_dfs=0` provides consistency and flexibility. After we refactor all the 
serializers we could consider rename this to avoid confusion.
   
     Alternative approaches I considered:
     1. Creating separate serializers for each combination (e.g., 
`ArrowStreamWithStartSerializer`, `ArrowStreamGroupWithStartSerializer`, etc.) 
- but this leads to more confusion.
     2. Renaming to something more generic like `ArrowStreamUDFSerializer` - 
but we already have `ArrowStreamUDFSerializer` and I wanted to avoid naming 
confusion. again, we could rename it back when we remove the current 
`ArrowStreamUDFSerializer`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to