liuzqt opened a new pull request, #56620:
URL: https://github.com/apache/spark/pull/56620

   ### What changes were proposed in this pull request?
   
   
   This change adds `PartitionKeyedAccumulator`, a `ConcurrentHashMap`-backed 
accumulator keyed by partition id with last-write-wins merge semantics. The 
cached relation now:
     - counts the DISTINCT materialized partition ids (the accumulator key set) 
when deciding whether the cache is fully loaded, so duplicate computes cannot 
inflate the count; and
     - derives exact, de-duplicated row-count and size stats by folding the 
per-partition values, counting each partition once.
   
   The behavior is gated by a new internal conf
   `spark.sql.inMemoryColumnarStorage.distinctPartitionTracking` (default 
true); setting it to false restores the prior raw task-completion-count 
behavior. `clearCache` resets the bookkeeping so a rebuilt cache starts clean.
   
   ### Why are the changes needed?
   
   Fix the bug where InMemoryRelation will be marked materialized prematurely 
under conrrent queries:
   
   `CachedRDDBuilder` tracked materialization with a plain task-completion 
`LongAccumulator` and summed per-batch row-count and size stats. When a cold 
cache is first touched concurrently, the same partition can be computed more 
than once across executors (duplicate cross-executor computes). Those 
duplicates inflate the task-completion count and double-count the summed stats. 
The inflated count can make `isCachedRDDLoaded` report the relation as fully 
materialized(prematurely) before the distinct partitions have actually been 
finished, and `computeStats` can then expose a `rowCount` that does not reflect 
the cached data. AQE's `PropagateEmptyRelation` can read that incorrect 
`rowCount` and collapse the cached source to an empty relation, silently 
dropping rows.
   
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   
   ### How was this patch tested?
   
   - `PartitionKeyedAccumulatorSuite` - accumulator semantics (last-write-wins 
add/merge, distinct key count, snapshot/reset).
   - `ConcurrentInMemoryRelationSuite` - local-cluster reproduction: rows are 
preserved under concurrent first-touch with the fix on; stats are exact under 
duplicate cross-executor computes; and a negative control showing the row loss 
with the fix disabled.
   - Extended `CachedTableSuite` (clearCache resets bookkeeping) and 
`InMemoryColumnarQuerySuite` (size/row-count read through the new accessors).
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to