shrirangmhalgi commented on code in PR #56620:
URL: https://github.com/apache/spark/pull/56620#discussion_r3444632204


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala:
##########
@@ -261,9 +275,30 @@ case class CachedRDDBuilder(
   @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = 
null
   @transient @volatile private var _cachedColumnBuffersAreLoaded: Boolean = 
false
 
-  val sizeInBytesStats: LongAccumulator = 
cachedPlan.session.sparkContext.longAccumulator
-  val rowCountStats: LongAccumulator = 
cachedPlan.session.sparkContext.longAccumulator
-  private val materializedPartitions = 
cachedPlan.session.sparkContext.longAccumulator
+  // The cache's materialization bookkeeping, chosen ONCE here (at 
construction) from the
+  // IN_MEMORY_DISTINCT_PARTITION_TRACKING conf, so the scheme can never 
change mid-build. Exactly
+  // one branch is allocated:
+  //  - Right (the fix, default): a partition-keyed accumulator storing 
(rowCount, sizeInBytes) per
+  //    partition. AQE creates a separate cache scan stage per reference to 
the same cache and each
+  //    submits its own build job, so the same partition can be computed by 
several concurrent jobs
+  //    (and speculative tasks); Spark has no global cross-executor "compute 
this partition once"
+  //    barrier (only a per-executor write lock). Keying by partition id 
(last-write-wins) means
+  //    those duplicate completions cannot mark the cache loaded before every 
partition has been
+  //    computed -- which otherwise let AQE read rowCount 0 on a non-empty 
cache and propagate an
+  //    empty relation, silently dropping rows -- and also yields exact, 
de-duplicated row count /

Review Comment:
   nit: "exact, de-duplicated" holds for deterministic stages. For 
indeterminate stages (e.g., `filter(rand() > 0.5`) cached with speculation), 
last-write-wins means the reported per-partition row count reflects whichever 
task's completion listener fires last -- which may differ from the task whose 
blocks the BlockManager actually stored (the first to acquire the partition 
write lock). The loaded-check is still sound (distinct partition IDs covers it) 
and this won't cause false empty-propagation since the count remains non-zero. 
Consider: "yields de-duplicated row count / size (exact for deterministic 
stages)."
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to