cloud-fan commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3376088031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -85,6 +86,11 @@ case class AdaptiveSparkPlanExec(
   @transient private val optimizer = new AQEOptimizer(conf,
     context.session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules)
 
+  // Each `AdaptiveSparkPlanExec` records the physical-planning rules it runs 
into its own tracker.
+  // The main node folds them all into the shared `context.qe.tracker` once 
the final plan is ready.
+  @transient private val tracker = new QueryPlanningTracker()
+  context.planningTrackers.add(tracker)

Review Comment:
   The per-node tracker introduced here — together with 
`AdaptiveExecutionContext.planningTrackers`, `QueryPlanningTracker.merge`, the 
merge loop in `finalPlanUpdate`, and the `if (!isSubquery)` special-case — 
exists solely to avoid concurrent writes to one tracker. The only mutation 
point is `recordRuleInvocation`. If we make that method (and the `rules` / 
`topRulesByTime` read accessors) `synchronized`, every AQE node — main and 
subquery — can record straight into the single shared `context.qe.tracker`, 
which is already reachable from every node (sub-AQEs reuse the same 
`AdaptiveExecutionContext`). That lets all five pieces of machinery above be 
deleted.
   
   Why this is worth it beyond line count:
   - **Correctness becomes local.** As written, the merge is safe only because 
sub-AQEs complete via `waitForSubqueries` -> `awaitResult` before 
`finalPlanUpdate` reads their trackers, and because `finalPlanUpdate` is a 
`lazy val` (runs once). That's a non-obvious chain that took manual tracing to 
confirm. A lock makes the guarantee local to the one mutating method and 
independent of *when* subqueries complete or *when* `finalPlanUpdate` runs — a 
future change to subquery scheduling or plan finalization can't silently 
reintroduce a race.
   - **New recording paths are safe for free.** Any future code that records a 
rule from another thread is automatically covered by the lock; with per-node 
trackers each new path has to wire up its own tracker + merge.
   - **Cost is negligible.** Recording is one `HashMap` put per rule 
invocation; an uncontended monitor on the single-threaded analyzer/optimizer 
path is tens of ns against ms-scale planning, and it's contended only by the 
rare concurrent subquery-planning threads.
   - **Immediate visibility.** A shared tracker reflects AQE rules as they run 
rather than only after `finalPlanUpdate` at execution time.
   
   This is essentially @dongjoon-hyun's earlier suggestion to wrap 
`recordRuleInvocation` in `synchronized` — applied to the reworked design, that 
single change subsumes the per-node/merge approach instead of sitting beside 
it. `PhysicalRuleExecutor` and the `applyPhysicalRules` signature cleanup are 
good and orthogonal — keep those. Non-blocking; the current code is correct, 
this is a simplification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to