cloud-fan commented on code in PR #49351:
URL: https://github.com/apache/spark/pull/49351#discussion_r1917869246
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -41,16 +47,113 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
plan: LogicalPlan,
cteDefMap: mutable.HashMap[Long, CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsDownWithPruning(_.containsAllPatterns(CTE)) {
- case w @ WithCTE(_, cteDefs) =>
- cteDefs.foreach { cteDef =>
- if (cteDef.resolved) {
- cteDefMap.put(cteDef.id, cteDef)
- }
+ case withCTE @ WithCTE(_, cteDefs) =>
+ val newCTEDefs = cteDefs.map {
+ // `cteDef.recursive` means "presence of a recursive CTERelationRef
under cteDef". The
+ // side effect of node substitution below is that after
CTERelationRef substitution
+ // its cteDef is no more considered `recursive`. This code path is
common for `cteDef`
+ // that were non-recursive from the get go, as well as those that
are no more recursive
+ // due to node substitution.
+ case cteDef if !cteDef.recursive =>
+ if (cteDef.resolved) {
+ cteDefMap.put(cteDef.id, cteDef)
+ }
+ cteDef
+ case cteDef =>
+ cteDef.child match {
+ // If it is a supported recursive CTE query pattern (4 so far),
extract the anchor and
+ // recursive plans from the Union and rewrite Union with
UnionLoop. The recursive CTE
+ // references inside UnionLoop's recursive plan will be
rewritten as UnionLoopRef,
+ // using the output of the resolved anchor plan. The side effect
of recursive
+ // CTERelationRef->UnionLoopRef substitution is that `cteDef`
that was originally
+ // considered `recursive` is no more in the context of
`cteDef.recursive` method
+ // definition.
+ //
+ // Simple case of duplicating (UNION ALL) clause.
+ case alias @ SubqueryAlias(_, Union(Seq(anchor, recursion),
false, false)) =>
+ if (!anchor.resolved) {
+ cteDef
+ } else {
+ val loop = UnionLoop(
+ cteDef.id,
+ anchor,
+ rewriteRecursiveCTERefs(recursion, anchor, cteDef.id,
None))
+ cteDef.copy(child = alias.copy(child = loop))
+ }
+
+ // The case of CTE name followed by a parenthesized list of
column name(s), eg.
+ // WITH RECURSIVE t(n).
+ case alias @ SubqueryAlias(_,
+ columnAlias @ UnresolvedSubqueryColumnAliases(
+ colNames,
+ Union(Seq(anchor, recursion), false, false)
+ )) =>
+ if (!anchor.resolved) {
+ cteDef
+ } else {
+ val loop = UnionLoop(
+ cteDef.id,
+ anchor,
+ rewriteRecursiveCTERefs(recursion, anchor, cteDef.id,
Some(colNames)))
+ cteDef.copy(child = alias.copy(child =
columnAlias.copy(child = loop)))
+ }
+
+ // If the recursion is described with an UNION (deduplicating)
clause then the
+ // recursive term should not return those rows that have been
calculated previously,
+ // and we exclude those rows from the current iteration result.
+ case alias @ SubqueryAlias(_,
+ Distinct(Union(Seq(anchor, recursion), false, false))) =>
+ if (!anchor.resolved) {
+ cteDef
+ } else {
+ val loop = UnionLoop(
+ cteDef.id,
+ Distinct(anchor),
+ Except(
+ rewriteRecursiveCTERefs(recursion, anchor, cteDef.id,
None),
+ UnionLoopRef(cteDef.id, anchor.output, true),
+ isAll = false
+ )
+ )
+ cteDef.copy(child = alias.copy(child = loop))
+ }
+
+ // The case of CTE name followed by a parenthesized list of
column name(s).
+ case alias @ SubqueryAlias(_,
+ columnAlias@UnresolvedSubqueryColumnAliases(
+ colNames,
+ Distinct(Union(Seq(anchor, recursion), false, false))
+ )) =>
Review Comment:
```suggestion
columnAlias@UnresolvedSubqueryColumnAliases(
colNames,
Distinct(Union(Seq(anchor, recursion), false, false))
)) =>
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]