Yicong-Huang opened a new pull request, #55961:
URL: https://github.com/apache/spark/pull/55961

   ### What changes were proposed in this pull request?
   
   Forward `prefers_large_types=runner_conf.use_large_var_types` when 
constructing `expected_cols_and_types` in `python/pyspark/worker.py` for the 
three Arrow map eval types:
   
   - `SQL_GROUPED_MAP_ARROW_UDF`
   - `SQL_GROUPED_MAP_ARROW_ITER_UDF`
   - `SQL_COGROUPED_MAP_ARROW_UDF`
   
   Each eval type built `arrow_return_type` with the `prefers_large_types` flag 
forwarded but, immediately below, built the per-field `expected_cols_and_types` 
schema (used by `verify_arrow_result` to validate the returned table) by 
calling `to_arrow_type(col.dataType, timezone="UTC")` per field, omitting the 
same flag. Adds the missing keyword to all six sites (name-based + positional) 
across the three branches.
   
   ### Why are the changes needed?
   
   When `spark.sql.execution.arrow.useLargeVarTypes=true`, fields of type 
`StringType`/`BinaryType` are produced as Arrow `large_string`/`large_binary` 
in the result table (because `arrow_return_type` correctly reflects that). But 
the parallel `expected_cols_and_types` schema, built without the flag, still 
expects regular `string`/`binary`. `verify_arrow_result` then raises a spurious 
`RESULT_COLUMN_TYPES_MISMATCH`:
   
   ```
   [RESULT_COLUMN_TYPES_MISMATCH] Column types of the returned data do not match
   specified schema. Mismatch: column 's' (expected string, actual 
large_string),
   column 'b' (expected binary, actual large_binary).
   ```
   
   A simple repro on master (without this fix):
   
   ```python
   spark.conf.set("spark.sql.execution.arrow.useLargeVarTypes", True)
   df = spark.createDataFrame([(0, "foo", b"foo")], "id long, s string, b 
binary")
   df.groupBy("id").applyInArrow(lambda t: t, "id long, s string, b 
binary").collect()
   # pyspark.errors.exceptions.base.PySparkRuntimeError:
   # [RESULT_COLUMN_TYPES_MISMATCH] ...
   ```
   
   This is a pre-requisite for SPARK-56608 (migrating `verify_arrow_result` 
checks into `enforce_schema`), since that work depends on the expected/actual 
Arrow schemas matching exactly under the large-var-types config.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes (bug fix). `applyInArrow` (grouped and cogrouped, both iterator and 
non-iterator variants) no longer raises a spurious 
`RESULT_COLUMN_TYPES_MISMATCH` when 
`spark.sql.execution.arrow.useLargeVarTypes=true` and the UDF returns a table 
containing `StringType`/`BinaryType` columns. Behavior under the default 
`useLargeVarTypes=false` is unchanged.
   
   ### How was this patch tested?
   
   Added `test_apply_in_arrow_large_var_types` to both 
`test_arrow_grouped_map.py` (covers `SQL_GROUPED_MAP_ARROW_UDF` and 
`SQL_GROUPED_MAP_ARROW_ITER_UDF` via the existing `function_variations` helper) 
and `test_arrow_cogrouped_map.py` (covers `SQL_COGROUPED_MAP_ARROW_UDF`). Each 
subtest exercises both name-based and positional column assignment. The tests 
assert that the UDF receives `large_string`/`large_binary` types and that the 
round-trip equals the input via `assertDataFrameEqual`. The Spark Connect 
parity test files automatically pick up the new tests through 
`ApplyInArrowTestsMixin` / `CogroupedMapInArrowTestsMixin`.
   
   Verified the bug exists on `master` by stashing the worker.py fix and 
re-running the new tests: both fail with the expected 
`RESULT_COLUMN_TYPES_MISMATCH`. With the fix applied, both pass.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to