cloud-fan commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3376088031


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -85,6 +86,11 @@ case class AdaptiveSparkPlanExec(
   @transient private val optimizer = new AQEOptimizer(conf,
     context.session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules)
 
+  // Each `AdaptiveSparkPlanExec` records the physical-planning rules it runs 
into its own tracker.
+  // The main node folds them all into the shared `context.qe.tracker` once 
the final plan is ready.
+  @transient private val tracker = new QueryPlanningTracker()
+  context.planningTrackers.add(tracker)

Review Comment:
   **Reconsidering my point 4 in [the earlier 
comment](https://github.com/apache/spark/pull/56275#discussion_r3364026179)** — 
I suggested the per-node tracker + `merge`, and you implemented it faithfully; 
thanks for that. But having seen it in the code, I think the simpler endpoint 
is the one @dongjoon-hyun originally pointed at: a lock on the single shared 
tracker. Apologies for the change of direction.
   
   The per-node tracker introduced here — together with 
`AdaptiveExecutionContext.planningTrackers`, `QueryPlanningTracker.merge`, the 
merge loop in `finalPlanUpdate`, and the `if (!isSubquery)` special-case — 
exists solely to avoid concurrent writes to one tracker. The only mutation 
point is `recordRuleInvocation`. If we make that method (and the `rules` / 
`topRulesByTime` read accessors) `synchronized`, every AQE node — main and 
subquery — can record straight into the single shared `context.qe.tracker`, 
which is already reachable from every node (sub-AQEs reuse the same 
`AdaptiveExecutionContext`). That lets all five pieces of machinery above be 
deleted.
   
   Why this is worth it beyond line count:
   - **Correctness becomes local.** As written, the merge is safe only because 
sub-AQEs complete via `waitForSubqueries` -> `awaitResult` before 
`finalPlanUpdate` reads their trackers, and because `finalPlanUpdate` is a 
`lazy val` (runs once). That's a non-obvious chain that took manual tracing to 
confirm. A lock makes the guarantee local to the one mutating method and 
independent of *when* subqueries complete or *when* `finalPlanUpdate` runs — a 
future change to subquery scheduling or plan finalization can't silently 
reintroduce a race.
   - **New recording paths are safe for free.** Any future code that records a 
rule from another thread is automatically covered by the lock; with per-node 
trackers each new path has to wire up its own tracker + merge.
   - **Cost is negligible.** Recording is one `HashMap` put per rule 
invocation; an uncontended monitor on the single-threaded analyzer/optimizer 
path is tens of ns against ms-scale planning, and it's contended only by the 
rare concurrent subquery-planning threads.
   - **Immediate visibility.** A shared tracker reflects AQE rules as they run 
rather than only after `finalPlanUpdate` at execution time.
   
   Note my earlier comment predicted the merge would remove the `isSubquery` 
special-casing, but it ends up still needing `if (!isSubquery)` here — the lock 
approach is what actually removes it. `PhysicalRuleExecutor` and the 
`applyPhysicalRules` signature cleanup are good and orthogonal — keep those. 
Non-blocking; the current code is correct, this is a simplification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to