shrirangmhalgi commented on code in PR #56620: URL: https://github.com/apache/spark/pull/56620#discussion_r3444632204
########## sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala: ########## @@ -261,9 +275,30 @@ case class CachedRDDBuilder( @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null @transient @volatile private var _cachedColumnBuffersAreLoaded: Boolean = false - val sizeInBytesStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator - val rowCountStats: LongAccumulator = cachedPlan.session.sparkContext.longAccumulator - private val materializedPartitions = cachedPlan.session.sparkContext.longAccumulator + // The cache's materialization bookkeeping, chosen ONCE here (at construction) from the + // IN_MEMORY_DISTINCT_PARTITION_TRACKING conf, so the scheme can never change mid-build. Exactly + // one branch is allocated: + // - Right (the fix, default): a partition-keyed accumulator storing (rowCount, sizeInBytes) per + // partition. AQE creates a separate cache scan stage per reference to the same cache and each + // submits its own build job, so the same partition can be computed by several concurrent jobs + // (and speculative tasks); Spark has no global cross-executor "compute this partition once" + // barrier (only a per-executor write lock). Keying by partition id (last-write-wins) means + // those duplicate completions cannot mark the cache loaded before every partition has been + // computed -- which otherwise let AQE read rowCount 0 on a non-empty cache and propagate an + // empty relation, silently dropping rows -- and also yields exact, de-duplicated row count / Review Comment: nit: "exact, de-duplicated" holds for deterministic stages. For indeterminate stages (e.g., `filter(rand() > 0.5`) cached with speculation), last-write-wins means the reported per-partition row count reflects whichever task's completion listener fires last -- which may differ from the task whose blocks the BlockManager actually stored (the first to acquire the partition write lock). The loaded-check is still sound (distinct partition IDs covers it) and this won't cause false empty-propagation since the count remains non-zero. Consider: "yields de-duplicated row count / size (exact for deterministic stages)." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
