LuciferYang opened a new pull request, #55751:
URL: https://github.com/apache/spark/pull/55751

   ### What changes were proposed in this pull request?
   
   This PR adds a bulk read+widen path for the `INT32 -> Long` type-converting 
Parquet vector updater (`IntegerToLongUpdater`). When the run length of values 
produced by the RLE/PACKED def-level decoder is at or above a configurable 
threshold (default 8), the Updater delegates to a new bulk method 
`VectorizedValuesReader.readIntegersAsLongs(total, c, rowId)`, which performs a 
single `getBuffer(total*4)` call and runs a tight in-method sign-extending 
conversion loop. Below the threshold, the existing per-row `readInteger() + 
putLong()` loop is unchanged.
   
   `VectorizedPlainValuesReader` is specialized; the interface default 
implementation falls back to the per-row pattern so non-Plain readers preserve 
correctness without a speedup. Specializing other readers (e.g. 
`VectorizedDeltaBinaryPackedReader`) and other type-converting Updaters 
(`IntegerToDouble`, `FloatToDouble`, `DateToTimestampNTZ`, `DowncastLong`) is 
left to follow-up PRs.
   
   The threshold is exposed as a new internal SQL conf:
   - Key: `spark.sql.parquet.vectorized.updater.bulkThreshold`
   - Default: `8`
   - Range: `>= 1`
   
   ### Why are the changes needed?
   
   The legacy per-row path pays a per-call `ByteBuffer` slice/orient allocation 
inside `getBuffer(4)`, which dominates the cost of 
`IntegerToLongUpdater.readValues`. INT32 -> Long widening is a common Parquet 
schema-evolution path for narrow integer columns broadened to long.
   
   Local benchmark on the `ParquetVectorUpdaterBenchmark` 
`IntegerToLongUpdater` case (Mac, OpenJDK 17):
   
   | | Best Time | Rate | Per Row |
   |---|---|---|---|
   | Before | 1 ms | 834.9 M/s | 1.2 ns |
   | After  | 0 ms | 3190.4 M/s | 0.3 ns |
   | Delta  | | **3.82x** | -75% |
   
   Other type-converting Updaters (not yet specialized) hold steady at ~835 
M/s, confirming the change is local to the wired Updater.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No, except for the addition of a new internal SQL conf 
`spark.sql.parquet.vectorized.updater.bulkThreshold`. Default behavior is 
unchanged from a correctness standpoint; only performance characteristics on 
the INT32 -> Long path change.
   
   ### How was this patch tested?
   
   - New unit tests in `ParquetVectorUpdaterSuite`:
     - Bulk vs per-row equivalence at boundary lengths (0, 1, 7, 8, 9, 17, 
1024, 4097).
     - Threshold gate semantics via a spy reader that distinguishes `>=` from 
`>` (a pure correctness assertion cannot).
     - `readValue` (singular) is not gated by the threshold.
     - Sign-extension on negative INT32 values (`Int.MinValue`, `-1`, 
`Int.MaxValue`).
     - Long-decimal dispatch path (`INT32 + DECIMAL(9,0) -> DecimalType(15,0)` 
via `canReadAsLongDecimal`).
   - New hygiene test pinning the SQLConf default to the legacy 7-arg ctor 
literal in `VectorizedParquetRecordReader` so drift between `sql/core` and 
`catalyst` is caught at unit level.
   - New end-to-end test in `ParquetIOSuite` round-trips an INT32 file read 
back as `LongType` under three threshold settings (1, default, `Int.MaxValue`) 
on both nullable and non-null columns.
   - Existing `ParquetVectorUpdaterBenchmark` provides A/B coverage for 
follow-up PRs to track perf deltas of additional Updater specializations.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to