zhengruifeng commented on code in PR #54172:
URL: https://github.com/apache/spark/pull/54172#discussion_r2791107528
##########
python/pyspark/worker.py:
##########
@@ -217,6 +217,46 @@ def chain(f, g):
return lambda *a: g(f(*a))
+def verify_result(expected_type: type) -> Callable[[Any], Iterator]:
Review Comment:
will this method be general enough to replace all `wrap_` functions?
##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -135,27 +136,38 @@ def load_stream(self, stream):
for batch in reader:
yield batch
- def _load_group_dataframes(self, stream, num_dfs: int = 1):
- """
- Load groups with specified number of dataframes from stream.
+ def __repr__(self):
+ return "ArrowStreamSerializer"
- For num_dfs=1, yields a single-element tuple containing a lazy
iterator.
- For num_dfs>1, yields a tuple of eagerly loaded lists to ensure correct
- stream position when reading multiple dataframes sequentially.
- Parameters
- ----------
- stream
- The input stream to read from
- num_dfs : int
- The expected number of dataframes in each group (e.g., 1 for
grouped UDFs,
- 2 for cogrouped UDFs)
+class ArrowStreamGroupSerializer(ArrowStreamSerializer):
+ """
+ Configurable Arrow stream serializer for UDF execution.
- Yields
- ------
- tuple
- For num_dfs=1: tuple[Iterator[pa.RecordBatch]]
- For num_dfs>1: tuple[list[pa.RecordBatch], ...]
+ Intended as the single base class that all UDFs will use.
+
+ Parameters
+ ----------
+ num_dfs : int
+ Number of dataframes per group.
+ For num_dfs=0, plain batch stream without group-count protocol.
Review Comment:
why this case is allowed in `ArrowStreamGroupSerializer`?
I think it should use a different ser
##########
python/pyspark/worker.py:
##########
@@ -2819,25 +2824,48 @@ def read_udfs(pickleSer, infile, eval_type,
runner_conf, eval_conf):
for i in range(num_udfs)
]
+ if eval_type == PythonEvalType.SQL_MAP_ARROW_ITER_UDF:
+ import pyarrow as pa
+
+ assert num_udfs == 1, "One MAP_ARROW_ITER UDF expected here."
+ udf_func = udfs[0]
Review Comment:
is it possible to set the type hint of `udf_func`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]