cloud-fan commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3376088031
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -85,6 +86,11 @@ case class AdaptiveSparkPlanExec(
@transient private val optimizer = new AQEOptimizer(conf,
context.session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules)
+ // Each `AdaptiveSparkPlanExec` records the physical-planning rules it runs
into its own tracker.
+ // The main node folds them all into the shared `context.qe.tracker` once
the final plan is ready.
+ @transient private val tracker = new QueryPlanningTracker()
+ context.planningTrackers.add(tracker)
Review Comment:
The per-node tracker introduced here — together with
`AdaptiveExecutionContext.planningTrackers`, `QueryPlanningTracker.merge`, the
merge loop in `finalPlanUpdate`, and the `if (!isSubquery)` special-case —
exists solely to avoid concurrent writes to one tracker. The only mutation
point is `recordRuleInvocation`. If we make that method (and the `rules` /
`topRulesByTime` read accessors) `synchronized`, every AQE node — main and
subquery — can record straight into the single shared `context.qe.tracker`,
which is already reachable from every node (sub-AQEs reuse the same
`AdaptiveExecutionContext`). That lets all five pieces of machinery above be
deleted.
Why this is worth it beyond line count:
- **Correctness becomes local.** As written, the merge is safe only because
sub-AQEs complete via `waitForSubqueries` -> `awaitResult` before
`finalPlanUpdate` reads their trackers, and because `finalPlanUpdate` is a
`lazy val` (runs once). That's a non-obvious chain that took manual tracing to
confirm. A lock makes the guarantee local to the one mutating method and
independent of *when* subqueries complete or *when* `finalPlanUpdate` runs — a
future change to subquery scheduling or plan finalization can't silently
reintroduce a race.
- **New recording paths are safe for free.** Any future code that records a
rule from another thread is automatically covered by the lock; with per-node
trackers each new path has to wire up its own tracker + merge.
- **Cost is negligible.** Recording is one `HashMap` put per rule
invocation; an uncontended monitor on the single-threaded analyzer/optimizer
path is tens of ns against ms-scale planning, and it's contended only by the
rare concurrent subquery-planning threads.
- **Immediate visibility.** A shared tracker reflects AQE rules as they run
rather than only after `finalPlanUpdate` at execution time.
This is essentially @dongjoon-hyun's earlier suggestion to wrap
`recordRuleInvocation` in `synchronized` — applied to the reworked design, that
single change subsumes the per-node/merge approach instead of sitting beside
it. `PhysicalRuleExecutor` and the `applyPhysicalRules` signature cleanup are
good and orthogonal — keep those. Non-blocking; the current code is correct,
this is a simplification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]