Yicong-Huang opened a new pull request, #56049:
URL: https://github.com/apache/spark/pull/56049
### What changes were proposed in this pull request?
Make the `SQL_MAP_ARROW_ITER_UDF` branch in `python/pyspark/worker.py`
coerce each
output `pa.RecordBatch` against the UDF's declared output schema via
`ArrowBatchTransformer.enforce_schema`, mirroring every sibling Arrow
eval-type
branch. `reorder_by_name` is wired to the existing
`spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName` config,
matching
`SQL_GROUPED_MAP_ARROW_UDF` / `applyInArrow`.
### Why are the changes needed?
Without coercion, an Arrow type mismatch between the UDF output and the
declared
output schema surfaces in the JVM as an opaque accessor error:
[UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of
the
class
"org.apache.spark.sql.vectorized.ArrowColumnVector\$ArrowVectorAccessor".
SQLSTATE: 0A000
Repro:
\`\`\`python
import pyarrow as pa
from pyspark.sql.types import StructType, StructField, IntegerType
def double_x(iter_batches):
for batch in iter_batches:
df = batch.to_pandas()
df["x"] = df["x"] * 2
yield pa.RecordBatch.from_pandas(df[["x"]])
src = spark.createDataFrame([(1,), (2,), (3,)], ["x"]) # x is long (int64)
out = src.mapInArrow(double_x, schema=StructType([StructField("x",
IntegerType())]))
out.show()
\`\`\`
\`createDataFrame\` infers \`x\` as \`LongType\`; the pandas roundtrip
preserves int64;
the declared schema is int32; the JVM picks \`LongAccessor\` for a
\`BigIntVector\`
and the outer scan calls \`getInt\` on it, hitting the no-op base accessor.
### Does this PR introduce _any_ user-facing change?
Yes. \`mapInArrow\` output batches that previously caused an opaque JVM
accessor
failure are now coerced in Python:
- Compatible Arrow type mismatches (e.g. int64 -> int32 from a pandas
roundtrip)
are cast with \`safecheck=True\`, so overflows still raise.
- Column ordering is reconciled by name under
\`spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName\`
(default
true), consistent with \`applyInArrow\`.
- Incompatible mismatches raise a Python \`RESULT_COLUMN_NAMES_MISMATCH\` /
\`RESULT_COLUMN_TYPES_MISMATCH\` error instead of failing in the JVM.
### How was this patch tested?
\`python/run-tests --testnames pyspark.sql.tests.arrow.test_arrow_map\`
(81 tests pass). Added regression test
\`test_coerce_output_type_to_declared_schema\`
covering the int64 -> int32 case; updated \`test_top_level_wrong_order\` and
\`test_nested_extraneous_field\` to reflect the new coerce-by-name behavior.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]