Yicong-Huang commented on code in PR #56464:
URL: https://github.com/apache/spark/pull/56464#discussion_r3463111993


##########
python/pyspark/worker.py:
##########
@@ -3373,37 +3349,147 @@ def map_batch(batch):
         return func, None, ser, ser
 
     if eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE_PANDAS_UDF:
-        # We assume there is only one UDF here because grouped map doesn't
-        # support combining multiple UDFs.
-        assert num_udfs == 1
+        import pyarrow as pa
+        import pandas as pd
+
+        assert num_udfs == 1, "One TRANSFORM_WITH_STATE_PANDAS UDF expected 
here."
+        udf, arg_offsets, return_type = udfs[0]
 
         # See TransformWithStateInPandasExec for how arg_offsets are used to
         # distinguish between grouping attributes and data attributes
-        arg_offsets, f = udfs[0]
         parsed_offsets = extract_key_value_indexes(arg_offsets)
-        ser.key_offsets = parsed_offsets[0][0]
+        assert len(parsed_offsets) == 1, (
+            "Expected one pair of offsets for TRANSFORM_WITH_STATE_PANDAS UDF."
+        )
+
+        key_offsets = parsed_offsets[0][0]
+        value_offsets = parsed_offsets[0][1]
+        output_schema = StructType([StructField("_0", return_type)])

Review Comment:
   Yes, this is the same wrapping as before, just inlined. Previously the 
output went through `ArrowStreamPandasUDFSerializer.dump_stream` -> 
`create_batch`, which builds `StructType([StructField(f"_{i}", t) for  i, t in 
enumerate(types)])`. The TWS path always emits a single `(pdf, return_type)`, 
so `types == [return_type]` and that produced exactly the one `field _0` 
struct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to