cloud-fan commented on code in PR #56275:
URL: https://github.com/apache/spark/pull/56275#discussion_r3364026179


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -85,6 +86,13 @@ case class AdaptiveSparkPlanExec(
   @transient private val optimizer = new AQEOptimizer(conf,
     context.session.sessionState.adaptiveRulesHolder.runtimeOptimizerRules)
 
+  // The tracker to record physical preparation rules into. Only the outer 
(non-subquery)
+  // `AdaptiveSparkPlanExec` records into the shared `context.qe.tracker`; 
sub-AQE replanning
+  // happens on `SubqueryExec.executionContext` and writing to the outer 
tracker from there
+  // would break `QueryPlanningTracker`'s thread-local single-threaded 
contract.
+  @transient private val mainQueryTracker: Option[QueryPlanningTracker] =

Review Comment:
   Stepping back: this hand-rolls per-rule timing that `RuleExecutor` already 
provides. `RuleExecutor.execute` records `(ruleName, runTime, effective)` with 
the same `nanoTime`/`fastEquals`/`recordRuleInvocation` shape 
(`RuleExecutor.scala:247-281`) off the `QueryPlanningTracker.get` ThreadLocal, 
and `executeAndTrack` is just `execute` wrapped in `withTracker` — which is 
exactly what `reOptimize` already calls for the optimizer. Instead, the PR 
reimplements that loop in `prepareForExecution` and `applyPhysicalRules` and 
threads an `Option[QueryPlanningTracker]` through six call sites, then drops 
subquery coverage to dodge the shared-`HashMap` race.
   
   Would you consider following the existing infrastructure?
   1. Wrap preparations in a `PreparationsExecutor extends 
RuleExecutor[SparkPlan]` (one `FixedPoint(1)` batch) and call 
`executeAndTrack(plan, tracker)` — this deletes the manual timing and also gets 
`PlanChangeLogger` + plan-change validation for free.
   2. Route `applyPhysicalRules`' rule sequence through a `RuleExecutor` the 
same way.
   3. Drop the `Option[QueryPlanningTracker]` parameter from both and all six 
call sites — the tracker rides the `QueryPlanningTracker.get` ThreadLocal.
   4. Give each `AdaptiveSparkPlanExec` (main and sub) its own tracker via 
`withTracker`, and `merge` each child tracker into the parent at the 
single-threaded point where subquery/stage results are collected back. 
`RuleSummary`'s fields are additive, so `merge` is a few lines.
   
   That removes the `isSubquery` special-casing, makes subqueries observable 
instead of a follow-up, and eliminates the race by construction (each thread 
writes only its own map). Non-blocking, but it's a meaningful simplification.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/QueryPlanningTrackerEndToEndSuite.scala:
##########
@@ -58,6 +59,31 @@ class QueryPlanningTrackerEndToEndSuite extends StreamTest {
       StopStream)
   }
 
+  test("SPARK-57212: Track preparation rules") {
+    val df = spark.range(1000).selectExpr("count(*)")
+    df.collect()
+    val ruleNames = df.queryExecution.tracker.rules.keySet
+    assert(ruleNames.contains(classOf[
+      org.apache.spark.sql.execution.exchange.EnsureRequirements].getName))
+    assert(ruleNames.contains(classOf[
+      org.apache.spark.sql.execution.CollapseCodegenStages].getName))
+  }

Review Comment:
   This test runs with AQE enabled (the default), so it doesn't actually 
exercise the non-AQE `prepareForExecution` path the PR primarily changes. With 
AQE on, `EnsureRequirements` and `CollapseCodegenStages` are part of AQE's 
`queryStagePreparationRules` (`AdaptiveSparkPlanExec.scala:124`, `:158`) and 
get recorded via the AQE path, so this assertion would pass even if the 
`prepareForExecution` tracker change were reverted — it gives no regression 
protection for the path it names.
   
   This is @dongjoon-hyun's earlier point: the `2a6d6ac` "fix test" commit 
wrapped the *second* test (`Track AQE-internal...`) with `AQE -> true`, but the 
request was to disable AQE *here*. Suggest wrapping this test so the asserted 
rules are recorded solely through `prepareForExecution`:
   ```suggestion
     test("SPARK-57212: Track preparation rules") {
       withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
         val df = spark.range(1000).selectExpr("count(*)")
         df.collect()
         val ruleNames = df.queryExecution.tracker.rules.keySet
         assert(ruleNames.contains(classOf[
           org.apache.spark.sql.execution.exchange.EnsureRequirements].getName))
         assert(ruleNames.contains(classOf[
           org.apache.spark.sql.execution.CollapseCodegenStages].getName))
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to