cloud-fan commented on code in PR #49232:
URL: https://github.com/apache/spark/pull/49232#discussion_r1900118080


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala:
##########
@@ -247,58 +260,130 @@ object CTESubstitution extends Rule[LogicalPlan] {
         // NOTE: we must call `traverseAndSubstituteCTE` before 
`substituteCTE`, as the relations
         // in the inner CTE have higher priority over the relations in the 
outer CTE when resolving
         // inner CTE relations. For example:
-        // WITH t1 AS (SELECT 1)
-        // t2 AS (
-        //   WITH t1 AS (SELECT 2)
-        //   WITH t3 AS (SELECT * FROM t1)
-        // )
-        // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
-        traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations, 
cteDefs)._1
+        // WITH
+        //   t1 AS (SELECT 1),
+        //   t2 AS (
+        //     WITH
+        //       t1 AS (SELECT 2),
+        //       t3 AS (SELECT * FROM t1)
+        //     SELECT * FROM t1
+        //   )
+        // SELECT * FROM t2
+        // t3 should resolve the t1 to `SELECT 2` ("inner" t1) instead of 
`SELECT 1`.
+        //
+        // When recursion allowed (RECURSIVE keyword used):
+        // Consider following example:
+        //  WITH
+        //    t1 AS (SELECT 1),
+        //    t2 AS (
+        //      WITH RECURSIVE
+        //        t1 AS (
+        //          SELECT 1 AS level
+        //          UNION (
+        //            WITH t3 AS (SELECT level + 1 FROM t1 WHERE level < 10)
+        //            SELECT * FROM t3
+        //          )
+        //        )
+        //      SELECT * FROM t1
+        //    )
+        //  SELECT * FROM t2
+        // t1 reference within t3 would initially resolve to outer `t1` 
(SELECT 1), as the inner t1
+        // is not yet known. Therefore, we need to remove definitions that 
conflict with current
+        // relation `name` from the list of `outerCTEDefs` entering 
`traverseAndSubstituteCTE()`.
+        // NOTE: It will be recognized later in the code that this is actually 
a self-reference
+        // (reference to the inner t1).
+        val nonConflictingCTERelations = if (allowRecursion) {
+          resolvedCTERelations.filterNot {
+            case (cteName, cteDef) => cteDef.conf.resolver(cteName, name)
+          }
+        } else {
+          resolvedCTERelations
+        }
+        traverseAndSubstituteCTE(relation, forceInline, 
nonConflictingCTERelations, cteDefs)._1
       }
-      // CTE definition can reference a previous one
-      val substituted = substituteCTE(innerCTEResolved, alwaysInline, 
resolvedCTERelations)
+
+      // If recursion is allowed (RECURSIVE keyword specified)
+      // then it has higher priority than outer or previous relations so
+      // construct a not yet substituted but recursive `CTERelationDef`, that 
we will prepend to
+      // `resolvedCTERelations`. Prepending is done within `substituteCTE` 
method.
+      val recursiveCTERelation = if (allowRecursion) {
+        Some(name -> CTERelationDef(relation))
+      } else {
+        None
+      }
+      // CTE definition can reference a previous one or itself if recursion 
allowed.
+      val substituted = substituteCTE(innerCTEResolved, alwaysInline,
+        resolvedCTERelations, recursiveCTERelation)
       val cteRelation = CTERelationDef(substituted)
       if (!alwaysInline) {
         cteDefs += cteRelation
       }
+
       // Prepending new CTEs makes sure that those have higher priority over 
outer ones.
       resolvedCTERelations +:= (name -> cteRelation)
     }
     resolvedCTERelations
   }
 
+  /**
+   * This function is called from `substituteCTE` to actually substitute 
unresolved relations
+   * with CTE references.
+   */
   private def resolveWithCTERelations(
       table: String,
       alwaysInline: Boolean,
       cteRelations: Seq[(String, CTERelationDef)],
+      recursiveCTERelation: Option[(String, CTERelationDef)],
       unresolvedRelation: UnresolvedRelation): LogicalPlan = {
-    cteRelations
-      .find(r => conf.resolver(r._1, table))
-      .map {
+    if (recursiveCTERelation.isDefined && 
conf.resolver(recursiveCTERelation.get._1, table)) {
+      // self-reference is found
+      recursiveCTERelation.map {
         case (_, d) =>
           if (alwaysInline) {
             d.child
           } else {
-            // Add a `SubqueryAlias` for hint-resolving rules to match 
relation names.
-            SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, 
d.isStreaming))
+            SubqueryAlias(table,
+              CTERelationRef(d.id, d.resolved, d.output, d.isStreaming, 
recursive = true))
           }
-      }
-      .getOrElse(unresolvedRelation)
+      }.get
+    }
+    else {

Review Comment:
   ```suggestion
       } else {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to