viirya opened a new pull request, #56334:
URL: https://github.com/apache/spark/pull/56334

   ### What changes were proposed in this pull request?
   
   This PR adds Apache Arrow as a native cache format for Spark in-memory 
Dataset
   caching, available alongside the existing `DefaultCachedBatchSerializer`. It 
is
   one of the sub-tasks of 
[SPARK-56978](https://issues.apache.org/jira/browse/SPARK-56978)
   (SPIP: Faster queries in local laptop mode for Apache Spark), specifically 
the
   "Arrow-based `df.cache` reimplementation" item.
   
   The new `ArrowCachedBatchSerializer` stores cached data in Apache Arrow IPC
   streaming format. It is opt-in via `spark.sql.cache.serializer`:
   
   ```scala
   spark.conf.set("spark.sql.cache.serializer",
     "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
   ```
   
   Main components:
   
   - **`ArrowCachedBatch`** -- a `SimpleMetricsCachedBatch` holding `numRows`, 
the
     serialized Arrow `RecordBatch` (IPC streaming format, optionally 
compressed),
     and per-column statistics for partition pruning.
   - **`ArrowCachedBatchSerializer`** -- the serializer:
     - Write paths for both `InternalRow` and `ColumnarBatch` input, with a
       zero-copy fast path when the input is already backed by 
`ArrowColumnVector`.
     - Read paths for both `ColumnarBatch` output (wrapping Arrow vectors 
directly)
       and `InternalRow` output. The row path uses pre-built typed
       `ArrowColumnReader`s that write directly into an `UnsafeRowWriter` to 
avoid
       per-row pattern matching, and falls back to a columnar-to-row path for
       complex types (Array/Struct/Map/UDT/Variant/etc.).
     - Optional background prefetch of the next batch (decompress/deserialize 
off
       the consumer thread), controlled by a new config (off by default).
     - Min/max statistics collection over Arrow vectors, kept consistent with 
the
       row-based `ColumnStats` path (NaN handling, collation-aware string
       comparison, null/decimal bounds).
   - **`ArrowUtils.isSupportedByArrow`** -- recursive type-support check used by
     `supportsColumnarInput`.
   - **`ObjectColumnStats`** -- now skips `getSizeInBytes` for columnar complex
     types (`ColumnarArray`/`ColumnarMap`/`ColumnarRow`), which are views into
     `ColumnVector`s and do not expose a size.
   - New config `spark.sql.execution.arrow.cache.prefetch.enabled` (default
     `false`), Kryo registration for the new classes, and documentation
     (format/migration/tuning guides linked from the SQL docs menu).
   
   ### Why are the changes needed?
   
   The default cache format is row/column-encoded specifically for Spark. Using
   Arrow as the cache format provides:
   
   - Zero-copy columnar reads when the cached data is already in Arrow form 
(e.g.
     re-caching Arrow-cached data with column projection).
   - Interoperability with the Arrow ecosystem and off-heap memory management 
via
     Arrow allocators.
   - Min/max statistics for partition pruning, consistent with the default path.
   
   In our benchmarks, the Arrow format is competitive with or faster than the
   default format on columnar/primitive workloads, with the largest gains on the
   zero-copy re-cache path. The default format can still be faster in some cases
   (for example, at higher compression levels), so this is offered as an opt-in
   alternative rather than a replacement. See
   `sql/core/benchmarks/ArrowCacheBenchmark-jdk21-results.txt` and the
   `ArrowCacheBenchmark` / `TPCDSCacheBenchmark` suites for details.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, additively. A new opt-in cache serializer
   (`ArrowCachedBatchSerializer`) and a new config
   `spark.sql.execution.arrow.cache.prefetch.enabled` (default `false`) are 
added.
   The default cache behavior is unchanged: `spark.sql.cache.serializer` still
   defaults to `DefaultCachedBatchSerializer`.
   
   ### How was this patch tested?
   
   - New `ArrowCachedBatchSerializerSuite` (~55 tests) covering primitive and
     complex/nested types, null handling, collation, NaN bounds, statistics
     correctness for both the row and columnar (Arrow-vector) paths, columnar
     input from Parquet, column projection, filter pushdown, compression codecs
     (none/zstd/lz4), and verification that the Arrow serializer is actually 
used.
   - `ArrowCachedBatchKryoRegistrationSuite` verifying Kryo registration.
   - Added `ArrowCacheBenchmark` and `TPCDSCacheBenchmark` for performance
     comparison against the default cache format.
   
   Locally: `catalyst/compile` + `sql/Test/compile` pass; the two suites above 
run
   green (55 tests, 0 failures).
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Claude Opus 4.8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to