viirya opened a new pull request, #56334: URL: https://github.com/apache/spark/pull/56334
### What changes were proposed in this pull request? This PR adds Apache Arrow as a native cache format for Spark in-memory Dataset caching, available alongside the existing `DefaultCachedBatchSerializer`. It is one of the sub-tasks of [SPARK-56978](https://issues.apache.org/jira/browse/SPARK-56978) (SPIP: Faster queries in local laptop mode for Apache Spark), specifically the "Arrow-based `df.cache` reimplementation" item. The new `ArrowCachedBatchSerializer` stores cached data in Apache Arrow IPC streaming format. It is opt-in via `spark.sql.cache.serializer`: ```scala spark.conf.set("spark.sql.cache.serializer", "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer") ``` Main components: - **`ArrowCachedBatch`** -- a `SimpleMetricsCachedBatch` holding `numRows`, the serialized Arrow `RecordBatch` (IPC streaming format, optionally compressed), and per-column statistics for partition pruning. - **`ArrowCachedBatchSerializer`** -- the serializer: - Write paths for both `InternalRow` and `ColumnarBatch` input, with a zero-copy fast path when the input is already backed by `ArrowColumnVector`. - Read paths for both `ColumnarBatch` output (wrapping Arrow vectors directly) and `InternalRow` output. The row path uses pre-built typed `ArrowColumnReader`s that write directly into an `UnsafeRowWriter` to avoid per-row pattern matching, and falls back to a columnar-to-row path for complex types (Array/Struct/Map/UDT/Variant/etc.). - Optional background prefetch of the next batch (decompress/deserialize off the consumer thread), controlled by a new config (off by default). - Min/max statistics collection over Arrow vectors, kept consistent with the row-based `ColumnStats` path (NaN handling, collation-aware string comparison, null/decimal bounds). - **`ArrowUtils.isSupportedByArrow`** -- recursive type-support check used by `supportsColumnarInput`. - **`ObjectColumnStats`** -- now skips `getSizeInBytes` for columnar complex types (`ColumnarArray`/`ColumnarMap`/`ColumnarRow`), which are views into `ColumnVector`s and do not expose a size. - New config `spark.sql.execution.arrow.cache.prefetch.enabled` (default `false`), Kryo registration for the new classes, and documentation (format/migration/tuning guides linked from the SQL docs menu). ### Why are the changes needed? The default cache format is row/column-encoded specifically for Spark. Using Arrow as the cache format provides: - Zero-copy columnar reads when the cached data is already in Arrow form (e.g. re-caching Arrow-cached data with column projection). - Interoperability with the Arrow ecosystem and off-heap memory management via Arrow allocators. - Min/max statistics for partition pruning, consistent with the default path. In our benchmarks, the Arrow format is competitive with or faster than the default format on columnar/primitive workloads, with the largest gains on the zero-copy re-cache path. The default format can still be faster in some cases (for example, at higher compression levels), so this is offered as an opt-in alternative rather than a replacement. See `sql/core/benchmarks/ArrowCacheBenchmark-jdk21-results.txt` and the `ArrowCacheBenchmark` / `TPCDSCacheBenchmark` suites for details. ### Does this PR introduce _any_ user-facing change? Yes, additively. A new opt-in cache serializer (`ArrowCachedBatchSerializer`) and a new config `spark.sql.execution.arrow.cache.prefetch.enabled` (default `false`) are added. The default cache behavior is unchanged: `spark.sql.cache.serializer` still defaults to `DefaultCachedBatchSerializer`. ### How was this patch tested? - New `ArrowCachedBatchSerializerSuite` (~55 tests) covering primitive and complex/nested types, null handling, collation, NaN bounds, statistics correctness for both the row and columnar (Arrow-vector) paths, columnar input from Parquet, column projection, filter pushdown, compression codecs (none/zstd/lz4), and verification that the Arrow serializer is actually used. - `ArrowCachedBatchKryoRegistrationSuite` verifying Kryo registration. - Added `ArrowCacheBenchmark` and `TPCDSCacheBenchmark` for performance comparison against the default cache format. Locally: `catalyst/compile` + `sql/Test/compile` pass; the two suites above run green (55 tests, 0 failures). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Claude Opus 4.8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
