baibaichen opened a new pull request, #55953:
URL: https://github.com/apache/spark/pull/55953
### What changes were proposed in this pull request?
Today Spark's storage memory pool is implicitly owned by `BlockManager` via
`MemoryStore`. There is no mechanism for an external cache (e.g., Velox
`AsyncDataCache` via Gluten, or other per-executor native columnar caches) to:
1. reserve bytes from the storage pool,
2. be asked to release bytes under pressure, or
3. grow into transient free capacity when storage is idle.
The existing `UnmanagedMemoryConsumer` SPI (SPARK-53001) is
informational-only — Spark subtracts the reported usage from
`effectiveMaxMemory` but cannot ask the consumer to release, and the consumer
cannot ask Spark for more.
This PR adds the `ManagedConsumer` SPI and wires it into
`UnifiedMemoryManager`.
**New SPI
(`core/src/main/scala/org/apache/spark/memory/ManagedConsumer.scala`)**
```scala
trait ManagedConsumer {
def name: String // registry key; unique
within JVM
def memoryMode: MemoryMode = MemoryMode.OFF_HEAP
def getShrinkableMemoryBytes: Long
def shrink(numBytes: Long): Long // returns bytes actually released
}
```
`name` IS the registry key — there is no separate `Id` type. Names must be
unique within the JVM (ON_HEAP and OFF_HEAP share one namespace); registering a
different instance under an already-used name throws
`IllegalArgumentException`. `memoryMode` defaults to `OFF_HEAP`, matching the
canonical use case (native off-heap caches sharing `spark.memory.offHeap.size`
with Spark's MemoryStore).
**`MemoryManager` additions**
- `registerManagedConsumer(c)` / `unregisterManagedConsumer(c)`
- `shrinkExternal(requested, mode, exclude): Long` — generic orchestrator.
Framework-owned pool accounting: after `shrink()` returns `released`, the
framework deducts exactly `released` bytes from the storage pool via
`pool.releaseMemory()`. **Consumers MUST NOT call `releaseStorageMemory`
themselves on the freed amount.**
**`UnifiedMemoryManager` wiring**
| Acquire site | Order |
|---|---|
| `acquireStorageMemory(blockId, ...)` *(BlockManager path)* |
borrow-from-execution → **shrinkExternal** → LRU eviction |
| `acquireStorageMemory(self: ManagedConsumer, ...)` *(new overload)* |
borrow-from-execution → **shrinkExternal(exclude = self)** → LRU eviction |
| `maybeGrowExecutionPool` | borrow back from storage, capped by
`memoryReclaimableFromStorage` **before** `shrinkExternal` — this protects the
storage region from being indirectly enlarged beyond `storageRegionSize` |
Self-exclusion by reference equality prevents a consumer from being asked to
shrink in service of its own grow request.
**New configs (default-off, opt-in)**
| Key | Default | Purpose |
|---|---|---|
| `spark.memory.managedConsumer.enabled` | `false` | Gate the entire SPI |
| `spark.memory.managedConsumer.shrinkWarnThresholdMs` | `100` | Log a WARN
when a single `shrink()` call exceeds this threshold |
JIRA: [SPARK-56918](https://issues.apache.org/jira/browse/SPARK-56918)
### Why are the changes needed?
A fourth category of executor off-heap memory is emerging and has no home
today: **shrinkable external caches that are per-executor singletons**, serving
the whole executor.
| Category | Owner | Spark accounting | Releasable on demand? |
|---|---|---|---|
| Execution | Spark task allocations | Tracked, arbitrated | Yes (spill) |
| Storage | Spark RDD cache / broadcast (`MemoryStore`) | Tracked,
arbitrated | Yes (evict) |
| Unmanaged | RocksDB state store
([SPARK-53001](https://issues.apache.org/jira/browse/SPARK-53001)) | *Reported*
only (pull-mode poll) | *No* — informational |
| **Shrinkable external cache** *(this PR)* | Per-executor singleton (e.g.,
Velox `AsyncDataCache`) | *Tracked, arbitrated* | *Yes* (evict cold pages) |
For these consumers both existing options are wrong:
- Treating them as **storage memory** via `MemoryStore` doesn't work —
`MemoryStore` is bound to `BlockManager`, `SerializerManager`, and
`BlockEvictionHandler`, which is exactly the limitation
[SPARK-48694](https://issues.apache.org/jira/browse/SPARK-48694) called out.
- Treating them as **unmanaged memory** via `UnmanagedMemoryConsumer` is
informational only — Spark cannot ask them to release when storage pressure
rises, and they cannot ask Spark for more when storage is idle.
The result today is **static partitioning** of off-heap between Spark
storage and the external cache, defeating the purpose of unified memory
management.
### Does this PR introduce _any_ user-facing change?
No. The SPI is opt-in via `spark.memory.managedConsumer.enabled` (default
`false`). Spark ships only the SPI; no in-tree implementation. Existing
`BlockManager` / `MemoryStore` / `UnmanagedMemoryConsumer` behavior is
unchanged when the gate is off.
### How was this patch tested?
New unit tests (all in `core/src/test/scala/org/apache/spark/memory/`):
- **`ManagedConsumerSuite`** — 5 trait-level cases covering the SPI shape,
`name` override for logs, independence from `UnmanagedMemoryConsumer`, the
empty default of `getShrinkableConsumers` on non-Unified backends, and
`consumerLogName` fallback for blank `name`.
- **`UnifiedMemoryManagerSuite`** — 26 new integration cases covering:
- borrow-from-execution → shrinkExternal → LRU eviction ordering
- storage-region protection in `maybeGrowExecutionPool`
- self-exclusion on the `acquireStorageMemory(self)` overload
- framework-owned pool accounting (consumer over-/under-reporting)
- idempotent registration; rejection of different-instance / null / empty
name; stale name-collider unregister is a safe no-op
- multi-consumer round-robin fairness
- `shrinkWarnThresholdMs` log emission
- `require()` guards on the `acquireStorageMemory(self)` overload — null
`self`, mismatched `memoryMode`, negative `numBytes`
- cross-SPI WARN when the same object is registered as both
`ManagedConsumer` and `UnmanagedMemoryConsumer`
Run:
```bash
build/sbt 'core/testOnly *ManagedConsumerSuite *UnifiedMemoryManagerSuite
*MemoryManagerSuite'
```
All 67 tests pass (55 Scala + 12 Java).
### Was this patch authored or co-authored using generative AI tooling?
Yes, Generated-by: GitHub Copilot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]