shrirangmhalgi commented on code in PR #56620: URL: https://github.com/apache/spark/pull/56620#discussion_r3444635054
########## sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala: ########## @@ -261,9 +275,30 @@ case class CachedRDDBuilder( @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null @transient @volatile private var _cachedColumnBuffersAreLoaded: Boolean = false - val sizeInBytesStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator - val rowCountStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator - private val materializedPartitions = cachedPlan.session.sparkContext.longAccumulator + // The cache's materialization bookkeeping, chosen ONCE here (at construction) from the + // IN_MEMORY_DISTINCT_PARTITION_TRACKING conf, so the scheme can never change mid-build. Exactly + // one branch is allocated: + // - Right (the fix, default): a partition-keyed accumulator storing (rowCount, sizeInBytes) per + // partition. AQE creates a separate cache scan stage per reference to the same cache and each + // submits its own build job, so the same partition can be computed by several concurrent jobs + // (and speculative tasks); Spark has no global cross-executor "compute this partition once" + // barrier (only a per-executor write lock). Keying by partition id (last-write-wins) means + // those duplicate completions cannot mark the cache loaded before every partition has been + // computed -- which otherwise let AQE read rowCount 0 on a non-empty cache and propagate an + // empty relation, silently dropping rows -- and also yields exact, de-duplicated row count / + // size. + // - Left (conf off): the raw per-batch accumulators (`LegacyAccumulators`) -- the buggy pre-fix + // behavior, kept only as a safety switch. + private val statsAccumulators + : Either[LegacyAccumulators, PartitionKeyedAccumulator[(Long, Long)]] = Review Comment: Is the `Either` / legacy path needed? The left branch re-enables the exact bug this PR fixes, so flipping the conf in production would silently re-introduce data loss. If the intent is a safety switch for rollback, a conf that disables the loaded-latch entirely (always report static plan stats until the RDD action returns) would be safer than re-enabling the broken counting. Otherwise, always using the keyed accumulator and dropping the branching would simplify every read site (`materializedRowCount`, `materializedSizeInBytes`, `recordMaterialized`, `isCachedRDDLoaded`) significantly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
