viirya opened a new pull request, #56444:
URL: https://github.com/apache/spark/pull/56444

   ### What changes were proposed in this pull request?
   
   This PR fixes a bug where the zstd compression level configured via 
`spark.sql.execution.arrow.compression.zstd.level` was silently ignored 
everywhere Arrow batches are compressed. Three places shared the same broken 
pattern:
   
   - `ArrowConverters.ArrowBatchIterator` (SPARK-54134)
   - `PythonArrowInput` (SPARK-54226; also covers `GroupedPythonArrowInput`, 
which reuses this codec via SPARK-55328)
   - `CoGroupedArrowPythonRunner` (SPARK-54226)
   
   They constructed `new ZstdCompressionCodec(level)` only to read its codec 
type, then rebuilt the codec through 
`CompressionCodec.Factory.INSTANCE.createCodec(codecType)`. The codec type enum 
does not carry a level, so that single-argument factory overload always builds 
a codec at the zstd default level (3), dropping the configured one.
   
   The codec construction is extracted into a shared 
`ArrowCompressionUtils.createCompressionCodec` helper that constructs the 
level-carrying codec instance directly (the helper lives in `sql/core` because 
`sql/api`, where `ArrowUtils` is, has no `arrow-compression` dependency). The 
level only matters on the write side; the read side looks up the codec by the 
type recorded in the IPC message, so reads are unaffected and the on-wire 
format is unchanged.
   
   The same bug class was found by @dbtsai during review of #56334 
(https://github.com/apache/spark/pull/56334#discussion_r3391654988); that PR 
fixes the cache-side instance of the pattern, and this PR fixes the remaining 
three pre-existing instances.
   
   ### Why are the changes needed?
   
   Users tuning `spark.sql.execution.arrow.compression.zstd.level` for Python 
UDF exchange or `df.toArrow()` got no effect at all: every level compressed 
identically at the default level 3, with no error or warning.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, though only in unreleased versions (the affected code is in 4.2.0 RCs 
and master, not in any released Spark). The configured zstd level now actually 
takes effect; previously all levels behaved like the default level 3.
   
   ### How was this patch tested?
   
   New `ArrowCompressionUtilsSuite`. The regression test compresses the same 
compressible-but-varying batch at zstd level -5 and level 19 and asserts level 
19 produces a strictly smaller payload. Against the old codec construction this 
test fails with byte-identical sizes at both levels (verified locally). A 
second test covers the `none` codec and the unsupported-codec error.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to