sunchao commented on code in PR #56334:
URL: https://github.com/apache/spark/pull/56334#discussion_r3486641466


##########
docs/sql-arrow-cache-format.md:
##########
@@ -0,0 +1,343 @@
+# Apache Arrow Cache Format for Spark

Review Comment:
   [P2] Please add the standard Spark Jekyll front matter (`layout`, `title`, 
`displayTitle`, and the ASF license block). This is the only tracked 
`docs/*.md` page without a leading `---`; without front matter it will not 
produce the `sql-arrow-cache-format.html` page referenced by the new menu and 
performance-guide links.



##########
sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala:
##########
@@ -38,6 +38,50 @@ private[sql] object ArrowUtils {
 
   // todo: support more types.
 
+  /**
+   * Check if a Spark DataType is supported by Arrow. This recursively checks 
complex types
+   * (Array, Struct, Map).
+   *
+   * Note: This checks compatibility with toArrowField(), not toArrowType(). 
Types like
+   * GeometryType, GeographyType, and VariantType are not supported by 
toArrowType() (which only
+   * handles primitive Arrow types), but ARE supported by toArrowField() which 
converts them to
+   * Arrow Struct representations with metadata. Since Arrow cache uses 
toArrowField() via
+   * toArrowSchema() to create the schema, these types are supported.
+   */
+  def isSupportedByArrow(dt: DataType): Boolean = {
+    dt match {
+      // Primitive types
+      case BooleanType | ByteType | ShortType | IntegerType | LongType | 
FloatType | DoubleType |
+          _: StringType | BinaryType | NullType =>
+        true
+
+      // Decimal
+      case _: DecimalType => true
+
+      // Temporal types
+      case DateType | TimestampType | TimestampNTZType | _: TimeType => true
+
+      // Interval types
+      case _: YearMonthIntervalType | _: DayTimeIntervalType | 
CalendarIntervalType => true

Review Comment:
   [P2] Please do not advertise `CalendarIntervalType` as fully supported 
unless the cache representation is lossless over Spark's full value domain. 
Arrow's `IntervalMonthDayNanoWriter` multiplies `CalendarInterval.microseconds` 
by 1000 with `Math.multiplyExact`, but Spark permits the full `Long` range. I 
reproduced this with `new CalendarInterval(0, 0, Long.MaxValue / 1000L + 1L)`: 
the default cache returns `COUNT=1`, while the Arrow cache aborts 
materialization with `ArithmeticException: long overflow`. Please use a 
lossless representation or reject and document this type rather than claiming 
parity with the default serializer.



##########
docs/sql-arrow-cache-format.md:
##########
@@ -0,0 +1,343 @@
+# Apache Arrow Cache Format for Spark
+
+## Overview
+
+Apache Spark supports using Apache Arrow as an alternative cache format for 
in-memory Dataset caching. This format provides improved performance for 
certain workloads, especially when working with columnar data sources like 
Parquet and ORC.
+
+## Benefits
+
+The Arrow cache format offers several advantages over the default cache format:
+
+- **Zero-copy reads** when input is already in Arrow format (e.g., Arrow-based 
data sources, re-caching Arrow cached data)
+- **Better filter pushdown** with min/max statistics for partition pruning
+- **Off-heap memory management** via Arrow allocators
+- **Efficient compression** with zstd and lz4 codecs
+- **Arrow ecosystem interoperability** for data sharing
+
+**Note**: Spark's built-in Parquet/ORC readers use internal column vectors 
(`OnHeapColumnVector`/`OffHeapColumnVector`), not Arrow format, so they don't 
benefit from zero-copy optimization.
+
+## Configuration
+
+`spark.sql.cache.serializer` is a static SQL configuration, so it must be set 
when the
+SparkSession is built and cannot be changed on a running session 
(`spark.conf.set` rejects static
+keys with `CANNOT_MODIFY_CONFIG`):
+
+```scala
+val spark = SparkSession.builder()
+  .appName("MyApp")
+  .config("spark.sql.cache.serializer",
+    "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
+  .getOrCreate()
+```
+
+**Note**: This config selects the cache serializer for the whole session; once 
set, this
+serializer handles every cached relation. There is no automatic per-relation 
fallback to another
+cache serializer based on the data types involved (see
+[Supported Data Types](#supported-data-types) for how unsupported types are 
handled). The chosen
+serializer is also cached process-wide on first use, so switching cache 
formats within a JVM that
+has already materialized a cache requires a fresh JVM (see
+[Migration from Default Cache](#migration-from-default-cache)).
+
+## Usage
+
+Once configured, use cache operations as normal:
+
+```scala
+// Cache a DataFrame
+val df = spark.read.parquet("data.parquet")
+df.cache()
+
+// Use cached data
+df.filter("age > 30").count()
+
+// Uncache when done
+df.unpersist()
+```
+
+## Compression
+
+Arrow cache supports multiple compression codecs. Configure compression with:
+
+```scala
+spark.conf.set("spark.sql.execution.arrow.compression.codec", "zstd")
+```
+
+Available options:
+- `none` - No compression (fastest, largest size, **default**)
+- `lz4` - LZ4 compression (fast, good compression)
+- `zstd` - Zstandard compression (slower, best compression)
+
+For zstd, you can also configure the compression level. Positive values (up to 
22) give better
+compression but slower speed; negative values give ultra-fast compression with 
lower ratios:
+
+```scala
+spark.conf.set("spark.sql.execution.arrow.compression.zstd.level", "3")  // 
Default: 3
+```
+
+## Vectorized Reader
+
+Enable vectorized reading for better performance with primitive types:
+
+```scala
+spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", 
"true")
+```
+
+When enabled, cached data is read as columnar batches instead of rows, which 
can significantly improve performance for columnar operations.
+
+## Performance Characteristics
+
+In our benchmarks, the Arrow cache format performs best on the following 
workloads. Actual
+results depend on data types, compression settings, and hardware, and the 
default cache format
+can be faster in some cases (for example, with higher compression levels):
+
+1. **Filter-Heavy Workloads**: Queries with selective filters benefit from 
min/max statistics.
+2. **Columnar Operations**: Aggregations and projections on cached data 
benefit from the Arrow format.
+3. **Parquet/ORC Caching**: Arrow's batch processing helps even without the 
zero-copy path.
+4. **Re-caching with Column Projection**: Dropping columns from Arrow-cached 
data preserves the
+   `ArrowColumnVector` format, enabling true zero-copy extraction and the 
largest gains.
+
+### Benchmark Results
+
+The numbers below are illustrative results from one run on an Apple M4 Max 
(OpenJDK 21.0.8) and
+will vary with hardware, JDK, and compression settings. They are not a 
guarantee. For the
+authoritative, regularly regenerated numbers, see
+`sql/core/benchmarks/ArrowCacheBenchmark-jdk21-results.txt` and the 
`ArrowCacheBenchmark` suite.
+
+| Workload | Default Cache | Arrow Cache | Speedup |
+|----------|--------------|-------------|---------|
+| Write + Read (5M rows, 3 primitive columns) | 153.7 ns/row | 74.2 ns/row | 
**~2X faster** |
+| Cache then filter (5M rows) | 100.1 ns/row | 70.8 ns/row | **~1.4X faster** |
+| Columnar input from Parquet (2M rows, 3 primitive columns) | 195.3 ns/row | 
113.1 ns/row | **~1.7X faster** |
+| Re-cache with zero-copy (2M rows, 2 columns) | 123.3 ns/row | 38.5 ns/row | 
**~3.2X faster** |
+
+**Notes**:
+- **Write + Read**: Significant improvement from efficient Arrow serialization 
and vectorized operations
+- **Cache then filter**: This measures end-to-end cache build plus a filtered 
scan, comparing the two cache formats. Both formats collect min/max statistics 
and can prune batches, so the difference reflects overall cache+scan throughput 
rather than pruning unique to Arrow
+- **Parquet caching**: Shows improvement despite Spark's Parquet reader 
producing `OnHeapColumnVector`/`OffHeapColumnVector` rather than 
`ArrowColumnVector`, due to Arrow's efficient batch processing
+- **Re-cache with zero-copy**: When caching a subset of columns from 
Arrow-cached data (e.g., `df.drop("column")`), the remaining columns preserve 
their `ArrowColumnVector` format, enabling true zero-copy extraction and 
achieving the best performance
+- **Zero-copy benefits** only apply when input is already `ArrowColumnVector` 
(e.g., Python Arrow sources, re-caching Arrow cached data with column 
projection)
+
+## Supported Data Types
+
+Arrow cache supports the following data types:
+
+### Primitive Types
+- BooleanType
+- ByteType, ShortType, IntegerType, LongType
+- FloatType, DoubleType
+- DecimalType (all precision/scale combinations)
+- NullType
+
+### Temporal Types
+- DateType
+- TimestampType
+- TimestampNTZType
+- TimeType
+
+### Interval Types
+- YearMonthIntervalType
+- DayTimeIntervalType
+- CalendarIntervalType
+
+### String and Binary
+- StringType (including collated strings)
+- BinaryType
+
+### Complex Types
+- ArrayType
+- StructType
+- MapType
+- Nested combinations of the above
+
+### Other Types
+- VariantType
+- GeometryType, GeographyType
+- User-defined types (UDTs) whose underlying representation is itself supported
+
+### Unsupported Types
+
+Arrow cache covers every type the default cache serializer supports, plus some 
it
+does not (for example geometry and geography). Types that Arrow cannot 
represent
+(such as `ObjectType`) are not silently dropped or routed to a different cache
+serializer: there is no per-type fallback, because the cache serializer is 
chosen
+once via the static `spark.sql.cache.serializer` configuration and then handles
+every cached relation. Attempting to cache an unsupported type fails with an
+`UNSUPPORTED_DATATYPE` error when the cache is materialized.
+
+## Statistics and Filter Pushdown
+
+Arrow cache automatically collects min/max statistics for the following types:
+- Boolean
+- Numeric types (Byte, Short, Int, Long, Float, Double)
+- Decimal
+- Date, Timestamp, and Timestamp without time zone (TIMESTAMP_NTZ)
+- Time
+- Year-month and day-time intervals
+- String (using collation-aware comparison for collated strings)
+
+Other types (Binary, Variant, calendar intervals, and complex types such as
+Array/Struct/Map) are cached but do not contribute min/max bounds, so they only
+record null counts and sizes.
+
+These statistics enable partition pruning when filtering:
+
+```scala
+val df = spark.range(10000000).cache()
+
+// This filter can skip batches using min/max statistics
+df.filter("id > 5000000").count()
+```
+
+## Memory Management
+
+Arrow cache uses off-heap memory managed by Apache Arrow allocators. This is a 
fundamental design choice in Apache Arrow and is not configurable for on-heap 
memory.
+
+**Memory Efficiency**:
+- Despite requiring off-heap memory, Arrow cache is often **more 
memory-efficient** than default cache:
+  - Efficient compression with zstd/lz4 codecs
+  - Compact columnar format without Java object overhead
+  - Better compression ratios, especially for strings and complex types
+- If you have limited off-heap memory, increase 
`spark.executor.memoryOverhead` to allocate more off-heap memory
+
+**Memory Cleanup**:
+Arrow memory is automatically cleaned up when:
+- Tasks complete
+- DataFrames are unpersisted
+- SparkSession is stopped
+
+You can monitor Arrow memory usage through Spark metrics and the Spark UI.
+
+## Limitations and Considerations
+
+1. **Static Configuration**: Cache serializer must be set before SparkSession 
creation
+2. **Memory Overhead**: Arrow format has small per-batch overhead
+3. **Compatibility**: Cannot mix cache formats - recache needed when switching
+4. **Compression Trade-off**: Higher compression = lower memory but slower 
reads
+
+## Migration from Default Cache
+
+The cache serializer is resolved from `spark.sql.cache.serializer` only on 
first use and is then
+held in a process-wide field that is not reset when a SparkSession stops. As a 
result, **switching
+cache formats requires a fresh JVM** once any cache has been materialized -- 
stopping and
+rebuilding the SparkSession in the same process keeps using the originally 
resolved serializer.
+
+To migrate from the default cache to Arrow cache:
+
+1. **Start a new JVM / driver process** (a brand-new Spark application).
+2. **Build the SparkSession with the Arrow serializer**:
+   ```scala
+   val spark = SparkSession.builder()
+     .config("spark.sql.cache.serializer",
+       "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
+     .getOrCreate()
+   ```
+3. **Cache your DataFrames** as usual.
+
+**Note**: Cache data is never shared across formats; each application caches 
in whichever format
+its serializer produces.
+
+## Troubleshooting
+
+### Out of Memory Errors
+
+If you encounter OOM errors with Arrow cache:
+
+1. Reduce batch size:
+   ```scala
+   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")  // 
Default: 10000
+   ```
+
+2. Enable compression:
+   ```scala
+   spark.conf.set("spark.sql.execution.arrow.compression.codec", "zstd")
+   ```
+
+3. Reduce compression level:
+   ```scala
+   spark.conf.set("spark.sql.execution.arrow.compression.zstd.level", "1")
+   ```
+
+### Slow Performance
+
+If Arrow cache is slower than expected:
+
+1. Enable vectorized reader:
+   ```scala
+   spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", 
"true")
+   ```
+
+2. Try different compression codec:
+   ```scala
+   spark.conf.set("spark.sql.execution.arrow.compression.codec", "lz4")  // 
Faster than zstd

Review Comment:
   [P2] Please do not recommend LZ4 as the faster/balanced choice with the 
dependencies used here. The benchmark source disables every LZ4 case because 
Arrow falls back to the Commons Compress implementation and measured roughly 
50x slower than zstd. Following this troubleshooting advice can therefore make 
the reported slowdown much worse. Either provide and benchmark the fast LZ4 
dependency, document that requirement, or recommend a codec supported by the 
committed measurements.



##########
sql/api/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala:
##########
@@ -38,6 +38,50 @@ private[sql] object ArrowUtils {
 
   // todo: support more types.
 
+  /**
+   * Check if a Spark DataType is supported by Arrow. This recursively checks 
complex types
+   * (Array, Struct, Map).
+   *
+   * Note: This checks compatibility with toArrowField(), not toArrowType(). 
Types like
+   * GeometryType, GeographyType, and VariantType are not supported by 
toArrowType() (which only
+   * handles primitive Arrow types), but ARE supported by toArrowField() which 
converts them to
+   * Arrow Struct representations with metadata. Since Arrow cache uses 
toArrowField() via
+   * toArrowSchema() to create the schema, these types are supported.
+   */
+  def isSupportedByArrow(dt: DataType): Boolean = {
+    dt match {
+      // Primitive types
+      case BooleanType | ByteType | ShortType | IntegerType | LongType | 
FloatType | DoubleType |
+          _: StringType | BinaryType | NullType =>
+        true
+
+      // Decimal
+      case _: DecimalType => true
+
+      // Temporal types
+      case DateType | TimestampType | TimestampNTZType | _: TimeType => true

Review Comment:
   [P2] Please reconcile this capability whitelist with current `master` before 
merging. `master` now maps and reads/writes `TimestampNTZNanosType` and 
`TimestampLTZNanosType` through Arrow, but this predicate still rejects them. 
The serializer then takes the row path, falls through to `ObjectColumnStats` 
(whose `ColumnType` has no nanos-timestamp case), and fails materialization 
with `UNSUPPORTED_DATATYPE`; row output also lacks a typed reader or fallback. 
The synthetic merge is conflict-free but still has this behavioral 
incompatibility.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ArrowCachedBatchSerializer.scala:
##########
@@ -0,0 +1,1459 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.columnar
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+import java.nio.channels.Channels
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.arrow.compression.{Lz4CompressionCodec, ZstdCompressionCodec}
+import org.apache.arrow.vector.{VectorLoader, VectorSchemaRoot, VectorUnloader}
+import org.apache.arrow.vector.compression.{CompressionCodec, 
NoCompressionCodec}
+import org.apache.arrow.vector.ipc.{ReadChannel, WriteChannel}
+import org.apache.arrow.vector.ipc.message.{ArrowRecordBatch, 
MessageSerializer}
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.columnar.{CachedBatch, 
SimpleMetricsCachedBatchSerializer}
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, 
ColumnVector}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * A [[CachedBatchSerializer]] that uses Apache Arrow as the cache format.
+ *
+ * This serializer:
+ *  - Supports both row-based (InternalRow) and columnar (ColumnarBatch) input
+ *  - Stores data in Arrow IPC streaming format with optional compression 
(zstd/lz4)
+ *  - Enables zero-copy columnar reads when output is ColumnarBatch
+ *  - Uses off-heap memory via Arrow allocators
+ *  - Collects per-column statistics for partition pruning
+ *  - Provides efficient interoperability with Arrow ecosystem
+ *
+ * Configuration options:
+ *  - spark.sql.cache.serializer: Set to this class name to enable
+ *  - spark.sql.execution.arrow.maxRecordsPerBatch: Max rows per cached batch
+ *  - spark.sql.execution.arrow.compression.codec: Compression (none/zstd/lz4)
+ *  - spark.sql.inMemoryColumnarStorage.enableVectorizedReader: Enable 
columnar output
+ */
+class ArrowCachedBatchSerializer extends SimpleMetricsCachedBatchSerializer {
+
+  override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = {
+    // Check if all data types in the schema are supported by Arrow
+    schema.forall(attr => ArrowUtils.isSupportedByArrow(attr.dataType))
+  }
+
+  override def convertInternalRowToCachedBatch(
+      input: RDD[InternalRow],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch] = {
+    // Capture config values on driver before RDD transformation
+    val sparkSchema = DataTypeUtils.fromAttributes(schema)
+    val maxRecordsPerBatch = conf.arrowMaxRecordsPerBatch
+    val maxBytesPerBatch = conf.arrowMaxBytesPerBatch
+    val timeZoneId = conf.sessionLocalTimeZone
+    val compressionCodecName = conf.arrowCompressionCodec
+    val compressionLevel = conf.arrowZstdCompressionLevel
+
+    input.mapPartitionsInternal { rowIterator =>
+      new InternalRowToArrowCachedBatchIterator(
+        rowIterator,
+        schema,
+        sparkSchema,
+        maxRecordsPerBatch,
+        maxBytesPerBatch,
+        timeZoneId,
+        compressionCodecName,
+        compressionLevel)
+    }
+  }
+
+  override def convertColumnarBatchToCachedBatch(
+      input: RDD[ColumnarBatch],
+      schema: Seq[Attribute],
+      storageLevel: StorageLevel,
+      conf: SQLConf): RDD[CachedBatch] = {
+    // Capture config values on driver before RDD transformation
+    val sparkSchema = DataTypeUtils.fromAttributes(schema)
+    val timeZoneId = conf.sessionLocalTimeZone
+    val compressionCodecName = conf.arrowCompressionCodec
+    val compressionLevel = conf.arrowZstdCompressionLevel
+
+    input.mapPartitionsInternal { batchIterator =>
+      new ColumnarBatchToArrowCachedBatchIterator(
+        batchIterator,
+        schema,
+        sparkSchema,
+        timeZoneId,
+        compressionCodecName,
+        compressionLevel)
+    }
+  }
+
+  override def supportsColumnarOutput(schema: StructType): Boolean = {
+    // Always support columnar output with Arrow
+    true
+  }
+
+  override def vectorTypes(attributes: Seq[Attribute], conf: SQLConf): 
Option[Seq[String]] = {
+    Option(Seq.fill(attributes.length)(classOf[ArrowColumnVector].getName))
+  }
+
+  override def convertCachedBatchToColumnarBatch(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[ColumnarBatch] = {
+    val cacheSchema = DataTypeUtils.fromAttributes(cacheAttributes)
+    val selectedSchema = DataTypeUtils.fromAttributes(selectedAttributes)
+    val columnIndices =
+      selectedAttributes.map(a => cacheAttributes.map(o => 
o.exprId).indexOf(a.exprId)).toArray
+    // Capture config on driver
+    val timeZoneId = conf.sessionLocalTimeZone
+    val prefetchEnabled = conf.arrowCachePrefetchEnabled
+
+    input.mapPartitionsInternal { batchIterator =>
+      new ArrowCachedBatchToColumnarBatchIterator(
+        batchIterator,
+        cacheSchema,
+        selectedSchema,
+        columnIndices,
+        timeZoneId,
+        prefetchEnabled)
+    }
+  }
+
+  override def convertCachedBatchToInternalRow(
+      input: RDD[CachedBatch],
+      cacheAttributes: Seq[Attribute],
+      selectedAttributes: Seq[Attribute],
+      conf: SQLConf): RDD[InternalRow] = {
+    val cacheSchema = DataTypeUtils.fromAttributes(cacheAttributes)
+    val selectedSchema = DataTypeUtils.fromAttributes(selectedAttributes)
+    val timeZoneId = conf.sessionLocalTimeZone
+
+    // Calculate column indices for projection
+    val selectedIndices = selectedAttributes.map { attr =>
+      cacheAttributes.indexWhere(_.exprId == attr.exprId)
+    }.toArray
+
+    // Check if all selected types can use the fast path.
+    // Types not handled by ArrowColumnReader must use the fallback path.
+    val needsFallback = selectedSchema.fields.exists { f =>
+      f.dataType match {
+        case _: ArrayType | _: StructType | _: MapType => true
+        case CalendarIntervalType | VariantType | NullType => true
+        case _: UserDefinedType[_] => true
+        // Geometry/Geography are represented as an Arrow struct (srid + wkb); 
the fast-path
+        // ArrowColumnReader does not handle them, so route them through the 
fallback.
+        case _: GeometryType | _: GeographyType => true
+        case _ => false
+      }
+    }
+
+    if (needsFallback) {
+      // Fall back to columnar-to-row conversion via ColumnarBatch for complex 
types.
+      // Use UnsafeProjection to convert ColumnarBatchRow to UnsafeRow.
+      convertCachedBatchToColumnarBatch(input, cacheAttributes, 
selectedAttributes, conf)
+        .mapPartitionsInternal { batchIter =>
+          val toUnsafe = 
org.apache.spark.sql.catalyst.expressions.UnsafeProjection.create(
+            selectedSchema)
+          batchIter.flatMap { batch =>
+            val numRows = batch.numRows()
+            new Iterator[InternalRow] {
+              private var rowIdx = 0
+              override def hasNext: Boolean = rowIdx < numRows
+              override def next(): InternalRow = {
+                val row = batch.getRow(rowIdx)
+                rowIdx += 1
+                toUnsafe(row)
+              }
+            }
+          }
+        }
+    } else {
+      val prefetchEnabled = conf.arrowCachePrefetchEnabled
+      input.mapPartitionsInternal { batchIterator =>
+        new ArrowCachedBatchToInternalRowIterator(
+          batchIterator,
+          cacheSchema,
+          selectedSchema,
+          selectedIndices,
+          timeZoneId,
+          prefetchEnabled)
+      }
+    }
+  }
+}
+
+/**
+ * Companion object with shared utility methods for Arrow cache serialization.
+ */
+private object ArrowCachedBatchSerializer {
+
+  // scalastyle:off caselocale
+  def createCompressionCodec(
+      codecName: String,
+      compressionLevel: Int): CompressionCodec = {
+    codecName.toLowerCase match {
+      case "none" => NoCompressionCodec.INSTANCE
+      // The codec instance must be constructed directly so that 
compressionLevel is honored:
+      // CompressionCodec.Factory.createCodec(codecType) ignores the level and 
builds a codec at
+      // the default level. The level only matters on the write side; the read 
side looks up the
+      // codec by the type recorded in the IPC message.
+      case "zstd" => new ZstdCompressionCodec(compressionLevel)
+      case "lz4" => new Lz4CompressionCodec()
+      case other =>
+        throw SparkException.internalError(
+          s"Unsupported Arrow compression codec: $other. Supported values: 
none, zstd, lz4")
+    }
+  }
+  // scalastyle:on caselocale
+
+  def serializeBatch(batch: ArrowRecordBatch): Array[Byte] = {
+    val out = new ByteArrayOutputStream()
+    val writeChannel = new WriteChannel(Channels.newChannel(out))
+    MessageSerializer.serialize(writeChannel, batch)
+    out.toByteArray
+  }
+
+  /**
+   * Shut down a prefetch worker during task cleanup without leaking the root 
it may have produced.
+   *
+   * The prefetch worker deserializes the next batch into a fresh 
[[VectorSchemaRoot]] off-thread.
+   * If task completion runs while a result is in flight (e.g. a LIMIT 
consumer stops early),
+   * cancelling and discarding the future would drop a root that was already 
(or is about to be)
+   * produced, and the subsequent `allocator.close()` would fail with "Memory 
was leaked by query".
+   *
+   * This stops accepting new work, waits for the worker to finish so no root 
is produced after we
+   * stop looking, then closes any completed result. Always returns null so 
the caller can null out
+   * its future reference. Safe to call with a null executor or future.
+   */
+  def drainAndClosePrefetch(
+      executor: java.util.concurrent.ExecutorService,
+      future: java.util.concurrent.Future[VectorSchemaRoot]): 
java.util.concurrent.Future[
+        VectorSchemaRoot] = {
+    if (executor != null) {
+      // Stop accepting new tasks and wait for the in-flight deserialization 
to finish, rather than
+      // interrupting it: an interrupt mid-allocation can race allocator 
shutdown and still leak.
+      executor.shutdown()
+      try {
+        executor.awaitTermination(Long.MaxValue, 
java.util.concurrent.TimeUnit.NANOSECONDS)
+      } catch {
+        case _: InterruptedException =>
+          Thread.currentThread().interrupt()
+          executor.shutdownNow()
+      }
+    }
+    if (future != null) {
+      try {
+        // The worker has terminated, so this does not block; close the root 
it produced.
+        val root = future.get()
+        if (root != null) {
+          root.close()
+        }
+      } catch {
+        // The batch was never produced (cancelled/failed); nothing to close.
+        case _: java.util.concurrent.CancellationException =>
+        case _: java.util.concurrent.ExecutionException =>
+        case _: InterruptedException => Thread.currentThread().interrupt()
+      }
+    }
+    null
+  }
+
+  def createColumnStats(dataType: DataType): ColumnStats = {
+    dataType match {
+      case BooleanType => new BooleanColumnStats
+      case ByteType => new ByteColumnStats
+      case ShortType => new ShortColumnStats
+      case IntegerType => new IntColumnStats
+      case DateType => new IntColumnStats  // Date is stored as Int
+      case LongType => new LongColumnStats
+      case TimestampType => new LongColumnStats  // Timestamp is stored as Long
+      case TimestampNTZType => new LongColumnStats  // TimestampNTZ is stored 
as Long
+      case FloatType => new FloatColumnStats
+      case DoubleType => new DoubleColumnStats
+      case st: StringType => new StringColumnStats(st)
+      case BinaryType => new BinaryColumnStats
+      case dt: DecimalType => new DecimalColumnStats(dt)
+      case CalendarIntervalType => new IntervalColumnStats
+      case _: YearMonthIntervalType => new IntColumnStats   // stored as Int
+      case _: DayTimeIntervalType => new LongColumnStats  // stored as Long
+      case _: TimeType => new LongColumnStats  // Time is stored as Long 
(nanoseconds)
+      case VariantType => new VariantColumnStats
+      // Geometry/Geography are stored as binary (WKB) internally, so reuse 
BinaryColumnStats
+      // to collect size/count without min/max bounds. They are AtomicTypes 
that ColumnType
+      // (used by ObjectColumnStats) does not handle, so they must be matched 
explicitly here.
+      case _: GeometryType | _: GeographyType => new BinaryColumnStats

Review Comment:
   [P2] Geometry/Geography statistics still assume an `UnsafeRow` 
representation. `BinaryColumnStats` calls `row.getBinary`, but the Catalyst 
physical value for these types is `BinaryView`; a valid `GenericInternalRow` 
(for example from a row-based DSv2 reader or direct serializer use) therefore 
throws `ClassCastException`, even though `ArrowWriter` correctly consumes the 
same value through `getBinaryView`. Please use a BinaryView-aware size 
collector and cover a direct generic-row input.



##########
docs/sql-arrow-cache-format.md:
##########
@@ -0,0 +1,343 @@
+# Apache Arrow Cache Format for Spark
+
+## Overview
+
+Apache Spark supports using Apache Arrow as an alternative cache format for 
in-memory Dataset caching. This format provides improved performance for 
certain workloads, especially when working with columnar data sources like 
Parquet and ORC.
+
+## Benefits
+
+The Arrow cache format offers several advantages over the default cache format:
+
+- **Zero-copy reads** when input is already in Arrow format (e.g., Arrow-based 
data sources, re-caching Arrow cached data)
+- **Better filter pushdown** with min/max statistics for partition pruning
+- **Off-heap memory management** via Arrow allocators
+- **Efficient compression** with zstd and lz4 codecs
+- **Arrow ecosystem interoperability** for data sharing
+
+**Note**: Spark's built-in Parquet/ORC readers use internal column vectors 
(`OnHeapColumnVector`/`OffHeapColumnVector`), not Arrow format, so they don't 
benefit from zero-copy optimization.
+
+## Configuration
+
+`spark.sql.cache.serializer` is a static SQL configuration, so it must be set 
when the
+SparkSession is built and cannot be changed on a running session 
(`spark.conf.set` rejects static
+keys with `CANNOT_MODIFY_CONFIG`):
+
+```scala
+val spark = SparkSession.builder()
+  .appName("MyApp")
+  .config("spark.sql.cache.serializer",
+    "org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer")
+  .getOrCreate()
+```
+
+**Note**: This config selects the cache serializer for the whole session; once 
set, this
+serializer handles every cached relation. There is no automatic per-relation 
fallback to another
+cache serializer based on the data types involved (see
+[Supported Data Types](#supported-data-types) for how unsupported types are 
handled). The chosen
+serializer is also cached process-wide on first use, so switching cache 
formats within a JVM that
+has already materialized a cache requires a fresh JVM (see
+[Migration from Default Cache](#migration-from-default-cache)).
+
+## Usage
+
+Once configured, use cache operations as normal:
+
+```scala
+// Cache a DataFrame
+val df = spark.read.parquet("data.parquet")
+df.cache()
+
+// Use cached data
+df.filter("age > 30").count()
+
+// Uncache when done
+df.unpersist()
+```
+
+## Compression
+
+Arrow cache supports multiple compression codecs. Configure compression with:
+
+```scala
+spark.conf.set("spark.sql.execution.arrow.compression.codec", "zstd")
+```
+
+Available options:
+- `none` - No compression (fastest, largest size, **default**)
+- `lz4` - LZ4 compression (fast, good compression)
+- `zstd` - Zstandard compression (slower, best compression)
+
+For zstd, you can also configure the compression level. Positive values (up to 
22) give better
+compression but slower speed; negative values give ultra-fast compression with 
lower ratios:
+
+```scala
+spark.conf.set("spark.sql.execution.arrow.compression.zstd.level", "3")  // 
Default: 3
+```
+
+## Vectorized Reader
+
+Enable vectorized reading for better performance with primitive types:
+
+```scala
+spark.conf.set("spark.sql.inMemoryColumnarStorage.enableVectorizedReader", 
"true")
+```
+
+When enabled, cached data is read as columnar batches instead of rows, which 
can significantly improve performance for columnar operations.
+
+## Performance Characteristics
+
+In our benchmarks, the Arrow cache format performs best on the following 
workloads. Actual
+results depend on data types, compression settings, and hardware, and the 
default cache format
+can be faster in some cases (for example, with higher compression levels):
+
+1. **Filter-Heavy Workloads**: Queries with selective filters benefit from 
min/max statistics.
+2. **Columnar Operations**: Aggregations and projections on cached data 
benefit from the Arrow format.
+3. **Parquet/ORC Caching**: Arrow's batch processing helps even without the 
zero-copy path.
+4. **Re-caching with Column Projection**: Dropping columns from Arrow-cached 
data preserves the
+   `ArrowColumnVector` format, enabling true zero-copy extraction and the 
largest gains.
+
+### Benchmark Results
+
+The numbers below are illustrative results from one run on an Apple M4 Max 
(OpenJDK 21.0.8) and
+will vary with hardware, JDK, and compression settings. They are not a 
guarantee. For the
+authoritative, regularly regenerated numbers, see
+`sql/core/benchmarks/ArrowCacheBenchmark-jdk21-results.txt` and the 
`ArrowCacheBenchmark` suite.
+
+| Workload | Default Cache | Arrow Cache | Speedup |
+|----------|--------------|-------------|---------|
+| Write + Read (5M rows, 3 primitive columns) | 153.7 ns/row | 74.2 ns/row | 
**~2X faster** |
+| Cache then filter (5M rows) | 100.1 ns/row | 70.8 ns/row | **~1.4X faster** |
+| Columnar input from Parquet (2M rows, 3 primitive columns) | 195.3 ns/row | 
113.1 ns/row | **~1.7X faster** |
+| Re-cache with zero-copy (2M rows, 2 columns) | 123.3 ns/row | 38.5 ns/row | 
**~3.2X faster** |
+
+**Notes**:
+- **Write + Read**: Significant improvement from efficient Arrow serialization 
and vectorized operations
+- **Cache then filter**: This measures end-to-end cache build plus a filtered 
scan, comparing the two cache formats. Both formats collect min/max statistics 
and can prune batches, so the difference reflects overall cache+scan throughput 
rather than pruning unique to Arrow
+- **Parquet caching**: Shows improvement despite Spark's Parquet reader 
producing `OnHeapColumnVector`/`OffHeapColumnVector` rather than 
`ArrowColumnVector`, due to Arrow's efficient batch processing
+- **Re-cache with zero-copy**: When caching a subset of columns from 
Arrow-cached data (e.g., `df.drop("column")`), the remaining columns preserve 
their `ArrowColumnVector` format, enabling true zero-copy extraction and 
achieving the best performance
+- **Zero-copy benefits** only apply when input is already `ArrowColumnVector` 
(e.g., Python Arrow sources, re-caching Arrow cached data with column 
projection)
+
+## Supported Data Types
+
+Arrow cache supports the following data types:
+
+### Primitive Types
+- BooleanType
+- ByteType, ShortType, IntegerType, LongType
+- FloatType, DoubleType
+- DecimalType (all precision/scale combinations)
+- NullType
+
+### Temporal Types
+- DateType
+- TimestampType
+- TimestampNTZType
+- TimeType
+
+### Interval Types
+- YearMonthIntervalType
+- DayTimeIntervalType
+- CalendarIntervalType
+
+### String and Binary
+- StringType (including collated strings)
+- BinaryType
+
+### Complex Types
+- ArrayType
+- StructType
+- MapType
+- Nested combinations of the above
+
+### Other Types
+- VariantType
+- GeometryType, GeographyType
+- User-defined types (UDTs) whose underlying representation is itself supported
+
+### Unsupported Types
+
+Arrow cache covers every type the default cache serializer supports, plus some 
it
+does not (for example geometry and geography). Types that Arrow cannot 
represent
+(such as `ObjectType`) are not silently dropped or routed to a different cache
+serializer: there is no per-type fallback, because the cache serializer is 
chosen
+once via the static `spark.sql.cache.serializer` configuration and then handles
+every cached relation. Attempting to cache an unsupported type fails with an
+`UNSUPPORTED_DATATYPE` error when the cache is materialized.
+
+## Statistics and Filter Pushdown
+
+Arrow cache automatically collects min/max statistics for the following types:
+- Boolean
+- Numeric types (Byte, Short, Int, Long, Float, Double)
+- Decimal
+- Date, Timestamp, and Timestamp without time zone (TIMESTAMP_NTZ)
+- Time
+- Year-month and day-time intervals
+- String (using collation-aware comparison for collated strings)
+
+Other types (Binary, Variant, calendar intervals, and complex types such as
+Array/Struct/Map) are cached but do not contribute min/max bounds, so they only
+record null counts and sizes.
+
+These statistics enable partition pruning when filtering:
+
+```scala
+val df = spark.range(10000000).cache()
+
+// This filter can skip batches using min/max statistics
+df.filter("id > 5000000").count()
+```
+
+## Memory Management
+
+Arrow cache uses off-heap memory managed by Apache Arrow allocators. This is a 
fundamental design choice in Apache Arrow and is not configurable for on-heap 
memory.

Review Comment:
   [P2] This describes the cache's memory model incorrectly. The durable 
`ArrowCachedBatch` payload is an `Array[Byte]`, and the default 
`Dataset.cache()` storage level is deserialized `MEMORY_AND_DISK`, so those 
cached bytes live on JVM heap. Arrow allocators back only the transient 
encode/decode roots. Please distinguish long-lived heap cache storage from 
transient off-heap vectors; otherwise the `executor.memoryOverhead` and 
monitoring advice below can cause users to under-size heap and misdiagnose 
cache OOMs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to