gaogaotiantian commented on code in PR #54172:
URL: https://github.com/apache/spark/pull/54172#discussion_r2796175734


##########
python/pyspark/worker.py:
##########
@@ -1406,7 +1411,7 @@ def read_single_udf(pickleSer, infile, eval_type, 
runner_conf, udf_index):
     elif eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF:
         return args_offsets, wrap_pandas_batch_iter_udf(func, return_type, 
runner_conf)
     elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
-        return args_offsets, wrap_arrow_batch_iter_udf(func, return_type, 
runner_conf)

Review Comment:
   `arg_offsets` is something that this eval type does not use?



##########
python/pyspark/worker.py:
##########
@@ -217,6 +217,46 @@ def chain(f, g):
     return lambda *a: g(f(*a))
 
 
+def verify_result(expected_type: type) -> Callable[[Any], Iterator]:

Review Comment:
   This might not be the final stage so it's okay for now. In the future we 
want to make the type hint more specific.



##########
python/pyspark/worker.py:
##########
@@ -217,6 +217,46 @@ def chain(f, g):
     return lambda *a: g(f(*a))
 
 
+def verify_result(expected_type: type) -> Callable[[Any], Iterator]:
+    """
+    Create a result verifier that checks both iterability and element types.
+
+    Returns a function that takes a UDF result, verifies it is iterable,
+    and lazily type-checks each element via map.
+
+    Parameters
+    ----------
+    expected_type : type
+        The expected Python/PyArrow type for each element
+        (e.g. pa.RecordBatch, pa.Array).
+    """
+    label: str = "{}.{}".format(expected_type.__module__.split(".")[0], 
expected_type.__name__)

Review Comment:
   `inspect.getmodule(expected_type).__package__` is probably a safer way to 
get `pandas` or `pyarrow`. Also we should use f-string when possible. Gradually 
move to the modern ways to do template strings.



##########
python/pyspark/worker.py:
##########
@@ -2749,7 +2754,7 @@ def read_udfs(pickleSer, infile, eval_type, runner_conf, 
eval_conf):
                 runner_conf.arrow_max_records_per_batch
             )
         elif eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
-            ser = ArrowStreamUDFSerializer()
+            ser = ArrowStreamGroupSerializer(num_dfs=0, 
write_start_stream=True)

Review Comment:
   From my comments above - `ArrowStreamGroupSerializer(num_dfs=0)` is a bit 
weird. We are creating a group serializer without groups..



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -135,27 +136,38 @@ def load_stream(self, stream):
         for batch in reader:
             yield batch
 
-    def _load_group_dataframes(self, stream, num_dfs: int = 1):
-        """
-        Load groups with specified number of dataframes from stream.
+    def __repr__(self):
+        return "ArrowStreamSerializer"
 
-        For num_dfs=1, yields a single-element tuple containing a lazy 
iterator.
-        For num_dfs>1, yields a tuple of eagerly loaded lists to ensure correct
-        stream position when reading multiple dataframes sequentially.
 
-        Parameters
-        ----------
-        stream
-            The input stream to read from
-        num_dfs : int
-            The expected number of dataframes in each group (e.g., 1 for 
grouped UDFs,
-            2 for cogrouped UDFs)
+class ArrowStreamGroupSerializer(ArrowStreamSerializer):
+    """
+    Configurable Arrow stream serializer for UDF execution.
 
-        Yields
-        ------
-        tuple
-            For num_dfs=1: tuple[Iterator[pa.RecordBatch]]
-            For num_dfs>1: tuple[list[pa.RecordBatch], ...]
+    Intended as the single base class that all UDFs will use.
+
+    Parameters
+    ----------
+    num_dfs : int
+        Number of dataframes per group.
+        For num_dfs=0, plain batch stream without group-count protocol.

Review Comment:
   I also have concerns here for a slightly different reason. Having `Group` in 
the class suggests that this should deal with streams with group. However you 
have other serializers that does not have group (if I understand correctly) 
that inherits this class, which is very confusing.
   
   I'm okay to have a more generic class that can deal with `group_number = 
0/1/2`, but in that case the class name should not contain `Group`. It should 
just be an arrow stream serializer that can deal with group. Why do we need 
this class? `_write_stream_start` has nothing to do with group right? We need a 
place for `_load_group_dataframes`? We can just put it in the base class and 
claim that we are supposed be able to deal with grouped dataframes right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to