Yicong-Huang opened a new pull request, #56049:
URL: https://github.com/apache/spark/pull/56049

   ### What changes were proposed in this pull request?
   
   Make the `SQL_MAP_ARROW_ITER_UDF` branch in `python/pyspark/worker.py` 
coerce each
   output `pa.RecordBatch` against the UDF's declared output schema via
   `ArrowBatchTransformer.enforce_schema`, mirroring every sibling Arrow 
eval-type
   branch. `reorder_by_name` is wired to the existing
   `spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName` config, 
matching
   `SQL_GROUPED_MAP_ARROW_UDF` / `applyInArrow`.
   
   ### Why are the changes needed?
   
   Without coercion, an Arrow type mismatch between the UDF output and the 
declared
   output schema surfaces in the JVM as an opaque accessor error:
   
       [UNSUPPORTED_CALL.WITHOUT_SUGGESTION] Cannot call the method "getInt" of 
the
       class 
"org.apache.spark.sql.vectorized.ArrowColumnVector\$ArrowVectorAccessor".
       SQLSTATE: 0A000
   
   Repro:
   
   \`\`\`python
   import pyarrow as pa
   from pyspark.sql.types import StructType, StructField, IntegerType
   
   def double_x(iter_batches):
       for batch in iter_batches:
           df = batch.to_pandas()
           df["x"] = df["x"] * 2
           yield pa.RecordBatch.from_pandas(df[["x"]])
   
   src = spark.createDataFrame([(1,), (2,), (3,)], ["x"])  # x is long (int64)
   out = src.mapInArrow(double_x, schema=StructType([StructField("x", 
IntegerType())]))
   out.show()
   \`\`\`
   
   \`createDataFrame\` infers \`x\` as \`LongType\`; the pandas roundtrip 
preserves int64;
   the declared schema is int32; the JVM picks \`LongAccessor\` for a 
\`BigIntVector\`
   and the outer scan calls \`getInt\` on it, hitting the no-op base accessor.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. \`mapInArrow\` output batches that previously caused an opaque JVM 
accessor
   failure are now coerced in Python:
   - Compatible Arrow type mismatches (e.g. int64 -> int32 from a pandas 
roundtrip)
     are cast with \`safecheck=True\`, so overflows still raise.
   - Column ordering is reconciled by name under
     \`spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName\` 
(default
     true), consistent with \`applyInArrow\`.
   - Incompatible mismatches raise a Python \`RESULT_COLUMN_NAMES_MISMATCH\` /
     \`RESULT_COLUMN_TYPES_MISMATCH\` error instead of failing in the JVM.
   
   ### How was this patch tested?
   
   \`python/run-tests --testnames pyspark.sql.tests.arrow.test_arrow_map\`
   (81 tests pass). Added regression test 
\`test_coerce_output_type_to_declared_schema\`
   covering the int64 -> int32 case; updated \`test_top_level_wrong_order\` and
   \`test_nested_extraneous_field\` to reflect the new coerce-by-name behavior.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to