liuzqt opened a new pull request, #56620:
URL: https://github.com/apache/spark/pull/56620
### What changes were proposed in this pull request?
This change adds `PartitionKeyedAccumulator`, a `ConcurrentHashMap`-backed
accumulator keyed by partition id with last-write-wins merge semantics. The
cached relation now:
- counts the DISTINCT materialized partition ids (the accumulator key set)
when deciding whether the cache is fully loaded, so duplicate computes cannot
inflate the count; and
- derives exact, de-duplicated row-count and size stats by folding the
per-partition values, counting each partition once.
The behavior is gated by a new internal conf
`spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` (default
true); setting it to false restores the prior raw task-completion-count
behavior. `clearCache` resets the bookkeeping so a rebuilt cache starts clean.
### Why are the changes needed?
Fix the bug where InMemoryRelation will be marked materialized prematurely
under conrrent queries:
`CachedRDDBuilder` tracked materialization with a plain task-completion
`LongAccumulator` and summed per-batch row-count and size stats. When a cold
cache is first touched concurrently, the same partition can be computed more
than once across executors (duplicate cross-executor computes). Those
duplicates inflate the task-completion count and double-count the summed stats.
The inflated count can make `isCachedRDDLoaded` report the relation as fully
materialized(prematurely) before the distinct partitions have actually been
finished, and `computeStats` can then expose a `rowCount` that does not reflect
the cached data. AQE's `PropagateEmptyRelation` can read that incorrect
`rowCount` and collapse the cached source to an empty relation, silently
dropping rows.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
- `PartitionKeyedAccumulatorSuite` - accumulator semantics (last-write-wins
add/merge, distinct key count, snapshot/reset).
- `ConcurrentInMemoryRelationSuite` - local-cluster reproduction: rows are
preserved under concurrent first-touch with the fix on; stats are exact under
duplicate cross-executor computes; and a negative control showing the row loss
with the fix disabled.
- Extended `CachedTableSuite` (clearCache resets bookkeeping) and
`InMemoryColumnarQuerySuite` (size/row-count read through the new accessors).
### Was this patch authored or co-authored using generative AI tooling?
Yes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]