gengliangwang commented on code in PR #56252: URL: https://github.com/apache/spark/pull/56252#discussion_r3345776685
########## sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala: ########## @@ -988,16 +988,25 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan with CodegenSup override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(unionedInputRDD) - // Set in `doProduce`, read in `doConsume` during single-threaded code - // emission. `numOutputRowsTerm` is registered once per stage so the - // metric appears in `references[]` exactly once instead of once per - // child. `currentEmittingChild` tells `doConsume` which child's - // projection to bind. - @transient private var numOutputRowsTerm: String = _ - @transient private var currentEmittingChild: Int = -1 + // Per-emission codegen state, set in `doProduce` and read in `doConsume`. + // `numOutputRowsTerm` is registered once per stage so the metric appears in + // `references[]` exactly once instead of once per child; `currentEmittingChild` + // tells `doConsume` which child's projection to bind. + // + // A single `UnionExec` instance can have its codegen driven by more than one + // thread at the same time: a reused exchange/subquery stage is generated + // concurrently with the main plan, and async subquery / dynamic-pruning + // execution can overlap a driver-side `doCodeGen`. A plain field would let a + // racing `doProduce` reset `currentEmittingChild` to -1 while another thread + // is still in `doConsume`. Each `doCodeGen` pass is itself single-threaded + // (`produce` -> `doConsume` run inline on one thread), so a `ThreadLocal` + // isolates the state per pass without that cross-thread race. + @transient private lazy val numOutputRowsTerm = new ThreadLocal[String] Review Comment: Good point -- you're right that this is per-pass state, and `ThreadLocal` is correct only because per-pass and per-thread coincide here (a pass runs inline on one thread, and passes don't nest). I've expanded the comment to spell out the scope and the rationale. The reason I preferred `ThreadLocal` over `ctx`: `CodegenContext` has no general-purpose per-pass attribute map -- `currentPartitionIndexVar` and friends are named fields. Threading `currentEmittingChild` / `numOutputRowsTerm` through `ctx` would mean adding `UnionExec`-specific fields to a class every operator instantiates, whereas the `ThreadLocal` keeps the state local to the one node that needs it. That's the tradeoff you described, and I landed on keeping it localized. Happy to move it onto `ctx` instead if you feel the consistency with `currentPartitionIndexVar` outweighs the pollution -- let me know your preference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
