rdtr commented on PR #56083:
URL: https://github.com/apache/spark/pull/56083#issuecomment-4539090000

   @cloud-fan I do not think #55985 fixes this. They address different symptoms 
of the same root cause.
   
   #55985 fixes double-mapping of nested CTE refs: `canonicalizeCTE` crosses 
`WithCTE` boundaries while the shared mutable map accumulates ids from sibling 
scopes, causing refs to be transformed twice (e.g., 0→1→0).
   
   This PR fixes a different issue, orphan `CTERelationRef` nodes that end up 
outside any `WithCTE` after `InlineCTE` drops the wrapper. Since 
`NormalizeCTEIds` only transforms refs inside the `WithCTE` case
     handler via `canonicalizeCTE`, these orphans are never reached and keep 
their original ids, breaking plan caching.
   
   Both stem from the same design issue: `NormalizeCTEIds` interleaves building 
the id mapping with applying it, and only applies it within `WithCTE` handlers.
   
   One concern with the current approach: the second-pass 
`transformDownWithSubqueries` won't reach orphan refs inside 
`CacheTableAsSelect.plan`, because `CacheTableAsSelect` extends `Command` 
(whose children returns Nil), so plan is hidden from tree traversal. It may 
lead to a cache miss.
   
   A potentially simpler unified fix would be to separate the two concerns: 
collect all CTE def ids first, then apply the mapping globally (2-pass):
   
   ```
   override def apply(plan: LogicalPlan): LogicalPlan = {
       val curId = new AtomicLong()
       val cteIdToNewId = mutable.Map.empty[Long, Long]
       collectCTEIds(plan, curId, cteIdToNewId)
       if (cteIdToNewId.isEmpty) plan else applyMapping(plan, cteIdToNewId)
     }
   
     private def collectCTEIds(
         plan: LogicalPlan, curId: AtomicLong, map: mutable.Map[Long, Long]): 
Unit = {
       plan match {
         case ctas: CacheTableAsSelect => collectCTEIds(ctas.plan, curId, map)
         case WithCTE(_, cteDefs) =>
           cteDefs.foreach(d => map.getOrElseUpdate(d.id, 
curId.getAndIncrement()))
         case _ =>
       }
       plan.children.foreach(collectCTEIds(_, curId, map))
       plan.expressions.foreach(_.foreachUp {
         case s: SubqueryExpression => collectCTEIds(s.plan, curId, map)
         case _ =>
       })
     }
   
     private def applyMapping(
         plan: LogicalPlan, map: mutable.Map[Long, Long]): LogicalPlan = {
       plan.transformDownWithSubqueries {
         case ctas: CacheTableAsSelect =>
           ctas.copy(plan = applyMapping(ctas.plan, map))
         case cteDef: CTERelationDef if map.contains(cteDef.id) =>
           cteDef.copy(id = map(cteDef.id))
         case ref: CTERelationRef if map.contains(ref.cteId) =>
           ref.copy(cteId = map(ref.cteId))
         case unionLoop: UnionLoop if map.contains(unionLoop.id) =>
           unionLoop.copy(id = map(unionLoop.id))
         case unionLoopRef: UnionLoopRef if map.contains(unionLoopRef.loopId) =>
           unionLoopRef.copy(loopId = map(unionLoopRef.loopId))
       }
     }
   ```
   
   This fixes both issues: no double-mapping (each node visited once in Pass 
2), no orphan misses (Pass 2 transforms all refs globally), CTAS handled 
explicitly in both passes, and eliminates `canonicalizeCTE` entirely.
   
     Happy to help with a PR if this direction is interesting. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to