cloud-fan commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3364026179
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -85,6 +86,13 @@ case class AdaptiveSparkPlanExec(
@transient private val optimizer = new AQEOptimizer(conf,
context.session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules)
+ // The tracker to record physical preparation rules into. Only the outer
(non-subquery)
+ // `AdaptiveSparkPlanExec` records into the shared `context.qe.tracker`;
sub-AQE replanning
+ // happens on `SubqueryExec.executionContext` and writing to the outer
tracker from there
+ // would break `QueryPlanningTracker`'s thread-local single-threaded
contract.
+ @transient private val mainQueryTracker: Option[QueryPlanningTracker] =
Review Comment:
Stepping back: this hand-rolls per-rule timing that `RuleExecutor` already
provides. `RuleExecutor.execute` records `(ruleName, runTime, effective)` with
the same `nanoTime`/`fastEquals`/`recordRuleInvocation` shape
(`RuleExecutor.scala:247-281`) off the `QueryPlanningTracker.get` ThreadLocal,
and `executeAndTrack` is just `execute` wrapped in `withTracker` — which is
exactly what `reOptimize` already calls for the optimizer. Instead, the PR
reimplements that loop in `prepareForExecution` and `applyPhysicalRules` and
threads an `Option[QueryPlanningTracker]` through six call sites, then drops
subquery coverage to dodge the shared-`HashMap` race.
Would you consider following the existing infrastructure?
1. Wrap preparations in a `PreparationsExecutor extends
RuleExecutor[SparkPlan]` (one `FixedPoint(1)` batch) and call
`executeAndTrack(plan, tracker)` — this deletes the manual timing and also gets
`PlanChangeLogger` + plan-change validation for free.
2. Route `applyPhysicalRules`' rule sequence through a `RuleExecutor` the
same way.
3. Drop the `Option[QueryPlanningTracker]` parameter from both and all six
call sites — the tracker rides the `QueryPlanningTracker.get` ThreadLocal.
4. Give each `AdaptiveSparkPlanExec` (main and sub) its own tracker via
`withTracker`, and `merge` each child tracker into the parent at the
single-threaded point where subquery/stage results are collected back.
`RuleSummary`'s fields are additive, so `merge` is a few lines.
That removes the `isSubquery` special-casing, makes subqueries observable
instead of a follow-up, and eliminates the race by construction (each thread
writes only its own map). Non-blocking, but it's a meaningful simplification.
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala:
##########
@@ -58,6 +59,31 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest {
StopStream)
}
+ test("SPARK-57212: Track preparation rules") {
+ val df = spark.range(1000).selectExpr("count(*)")
+ df.collect()
+ val ruleNames = df.queryExecution.tracker.rules.keySet
+ assert(ruleNames.contains(classOf[
+ org.apache.spark.sql.execution.exchange.EnsureRequirements].getName))
+ assert(ruleNames.contains(classOf[
+ org.apache.spark.sql.execution.CollapseCodegenStages].getName))
+ }
Review Comment:
This test runs with AQE enabled (the default), so it doesn't actually
exercise the non-AQE `prepareForExecution` path the PR primarily changes. With
AQE on, `EnsureRequirements` and `CollapseCodegenStages` are part of AQE's
`queryStagePreparationRules` (`AdaptiveSparkPlanExec.scala:124`, `:158`) and
get recorded via the AQE path, so this assertion would pass even if the
`prepareForExecution` tracker change were reverted — it gives no regression
protection for the path it names.
This is @dongjoon-hyun's earlier point: the `2a6d6ac` "fix test" commit
wrapped the *second* test (`Track AQE-internal...`) with `AQE -> true`, but the
request was to disable AQE *here*. Suggest wrapping this test so the asserted
rules are recorded solely through `prepareForExecution`:
```suggestion
test("SPARK-57212: Track preparation rules") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = spark.range(1000).selectExpr("count(*)")
df.collect()
val ruleNames = df.queryExecution.tracker.rules.keySet
assert(ruleNames.contains(classOf[
org.apache.spark.sql.execution.exchange.EnsureRequirements].getName))
assert(ruleNames.contains(classOf[
org.apache.spark.sql.execution.CollapseCodegenStages].getName))
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]