gengliangwang commented on code in PR #56252:
URL: https://github.com/apache/spark/pull/56252#discussion_r3345776685


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala:
##########
@@ -988,16 +988,25 @@ case class UnionExec(children: Seq[SparkPlan]) extends 
SparkPlan with CodegenSup
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(unionedInputRDD)
 
-  // Set in `doProduce`, read in `doConsume` during single-threaded code
-  // emission. `numOutputRowsTerm` is registered once per stage so the
-  // metric appears in `references[]` exactly once instead of once per
-  // child. `currentEmittingChild` tells `doConsume` which child's
-  // projection to bind.
-  @transient private var numOutputRowsTerm: String = _
-  @transient private var currentEmittingChild: Int = -1
+  // Per-emission codegen state, set in `doProduce` and read in `doConsume`.
+  // `numOutputRowsTerm` is registered once per stage so the metric appears in
+  // `references[]` exactly once instead of once per child; 
`currentEmittingChild`
+  // tells `doConsume` which child's projection to bind.
+  //
+  // A single `UnionExec` instance can have its codegen driven by more than one
+  // thread at the same time: a reused exchange/subquery stage is generated
+  // concurrently with the main plan, and async subquery / dynamic-pruning
+  // execution can overlap a driver-side `doCodeGen`. A plain field would let a
+  // racing `doProduce` reset `currentEmittingChild` to -1 while another thread
+  // is still in `doConsume`. Each `doCodeGen` pass is itself single-threaded
+  // (`produce` -> `doConsume` run inline on one thread), so a `ThreadLocal`
+  // isolates the state per pass without that cross-thread race.
+  @transient private lazy val numOutputRowsTerm = new ThreadLocal[String]

Review Comment:
   Good point -- you're right that this is per-pass state, and `ThreadLocal` is 
correct only because per-pass and per-thread coincide here (a pass runs inline 
on one thread, and passes don't nest). I've expanded the comment to spell out 
the scope and the rationale.
   
   The reason I preferred `ThreadLocal` over `ctx`: `CodegenContext` has no 
general-purpose per-pass attribute map -- `currentPartitionIndexVar` and 
friends are named fields. Threading `currentEmittingChild` / 
`numOutputRowsTerm` through `ctx` would mean adding `UnionExec`-specific fields 
to a class every operator instantiates, whereas the `ThreadLocal` keeps the 
state local to the one node that needs it. That's the tradeoff you described, 
and I landed on keeping it localized.
   
   Happy to move it onto `ctx` instead if you feel the consistency with 
`currentPartitionIndexVar` outweighs the pollution -- let me know your 
preference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to