cloud-fan commented on code in PR #49232:
URL: https://github.com/apache/spark/pull/49232#discussion_r1898305146


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala:
##########
@@ -247,58 +259,137 @@ object CTESubstitution extends Rule[LogicalPlan] {
         // NOTE: we must call `traverseAndSubstituteCTE` before 
`substituteCTE`, as the relations
         // in the inner CTE have higher priority over the relations in the 
outer CTE when resolving
         // inner CTE relations. For example:
-        // WITH t1 AS (SELECT 1)
-        // t2 AS (
-        //   WITH t1 AS (SELECT 2)
-        //   WITH t3 AS (SELECT * FROM t1)
-        // )
+        // WITH
+        //   t1 AS (SELECT 1),
+        //   t2 AS (
+        //     WITH
+        //       t1 AS (SELECT 2),
+        //       t3 AS (SELECT * FROM t1)
+        //     SELECT * FROM t1
+        //   )
+        // SELECT * FROM t2
         // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
-        traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, 
cteDefs)._1
+        //
+        // When recursion allowed:
+        // - don't add current definition to outer definitions of 
`traverseAndSubstituteCTE()` to
+        //   prevent recursion inside inner CTEs.
+        //   E.g. the following query will not resolve `t1` within `t2`:
+        //   WITH RECURSIVE
+        //     t1 AS (
+        //       SELECT 1 AS level
+        //       UNION (
+        //         WITH t2 AS (SELECT level + 1 FROM t1 WHERE level < 10)
+        //         SELECT * FROM t2
+        //       )
+        //     )
+        //   SELECT * FROM t1
+        // - remove definitions that conflict with current relation `name` 
from outer definitions of
+        //   `traverseAndSubstituteCTE()` to prevent weird resolutions.
+        //   E.g. we don't want to resolve `t1` within `t3` to `SELECT 1`:
+        //   WITH
+        //     t1 AS (SELECT 1),
+        //     t2 AS (
+        //       WITH RECURSIVE
+        //         t1 AS (
+        //           SELECT 1 AS level
+        //           UNION (
+        //             WITH t3 AS (SELECT level + 1 FROM t1 WHERE level < 10)
+        //             SELECT * FROM t3
+        //           )
+        //         )
+        //       SELECT * FROM t1
+        //     )
+        //   SELECT * FROM t2
+        val nonConflictingCTERelations = if (allowRecursion) {
+          resolvedCTERelations.filterNot {
+            case (cteName, cteDef) => cteDef.conf.resolver(cteName, name)
+          }
+        } else {
+          resolvedCTERelations
+        }
+        traverseAndSubstituteCTE(relation, forceInline, 
nonConflictingCTERelations, cteDefs)._1
       }
-      // CTE definition can reference a previous one
-      val substituted = substituteCTE(innerCTEResolved, alwaysInline, 
resolvedCTERelations)
-      val cteRelation = CTERelationDef(substituted)
+
+      // If recursion is allowed then it has higher priority than outer or 
previous relations so
+      // construct a not yet substituted but recursive `CTERelationDef`, that 
we will prepend to
+      // `resolvedCTERelations`.
+      val recursiveCTERelation = if (allowRecursion) {
+        Some(name -> CTERelationDef(relation, recursive = true))
+      } else {
+        None
+      }
+      // CTE definition can reference a previous one or itself if recursion 
allowed.
+      val (substituted, recursionFound) = substituteCTE(innerCTEResolved, 
alwaysInline,
+        resolvedCTERelations, recursiveCTERelation)
+      val cteRelation = recursiveCTERelation
+        .map(_._2.copy(child = substituted, recursive = recursionFound))
+        .getOrElse(CTERelationDef(substituted))
       if (!alwaysInline) {
         cteDefs += cteRelation
       }
+
+      // From this point any reference to the definition is non-recursive.
+      val nonRecursiveCTERelation = if (cteRelation.recursive) {
+        cteRelation.copy(recursive = false)
+      } else {
+        cteRelation
+      }
+
       // Prepending new CTEs makes sure that those have higher priority over 
outer ones.
-      resolvedCTERelations +:= (name -> cteRelation)
+      resolvedCTERelations +:= (name -> nonRecursiveCTERelation)
     }
     resolvedCTERelations
   }
 
+  /**
+   * @return tuple of the logical plan and recursionFound boolean.
+   */
   private def resolveWithCTERelations(
       table: String,
       alwaysInline: Boolean,
       cteRelations: Seq[(String, CTERelationDef)],
-      unresolvedRelation: UnresolvedRelation): LogicalPlan = {
-    cteRelations
+      unresolvedRelation: UnresolvedRelation): (LogicalPlan, Boolean) = {
+    var recursionFound = false
+    val cteRelationsOut = cteRelations
       .find(r => conf.resolver(r._1, table))
       .map {
         case (_, d) =>
           if (alwaysInline) {
             d.child
           } else {
+            if (d.recursive) {
+              recursionFound = true
+            }
             // Add a `SubqueryAlias` for hint-resolving rules to match 
relation names.
-            SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, 
d.isStreaming))
+            SubqueryAlias(table,
+              CTERelationRef(d.id, d.resolved, d.output, d.isStreaming, 
recursive = d.recursive))
           }
       }
       .getOrElse(unresolvedRelation)
+    (cteRelationsOut, recursionFound)

Review Comment:
   Looking at the whole PR, only one caller needs the `recursionFound`. I 
suggest we just return the logical plan and let that single caller figure out 
if recursion happens or not
   ```
   val substituted = ...
   val isRecursive = substituted.find {
     case ref: CTERelationRef => ref.recursive
   }.isDefined
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to