mgaido91 commented on a change in pull request #23531: [SPARK-24497][SQL] 
Support recursive SQL query
URL: https://github.com/apache/spark/pull/23531#discussion_r321960214
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 ##########
 @@ -120,6 +125,124 @@ object CTESubstitution extends Rule[LogicalPlan] {
     }
   }
 
+  /**
+   * If recursion is allowed recursion handling starts with inserting 
unresolved self-references
+   * ([[UnresolvedRecursiveReference]]) to places where a reference to the CTE 
definition itself is
+   * found.
+   * If there is a self-reference then we need to check if structure of the 
query satisfies the SQL
+   * recursion rules and insert the appropriate [[RecursiveRelation]] finally.
+   */
+  private def handleRecursion(
+      ctePlan: => LogicalPlan,
+      cteName: String,
+      allowRecursion: Boolean) = {
+    if (allowRecursion) {
+      // check if there is any reference to the CTE and if there is then treat 
the CTE as recursive
+      val (recursiveReferencesPlan, recursiveReferenceCount) =
+        insertRecursiveReferences(ctePlan, cteName)
+      if (recursiveReferenceCount > 0) {
+        // if there is a reference then the CTE needs to follow one of these 
structures
+        recursiveReferencesPlan match {
+          case SubqueryAlias(_, u: Union) =>
+            insertRecursiveRelation(cteName, Seq.empty, false, u)
+          case SubqueryAlias(_, Distinct(u: Union)) =>
+            insertRecursiveRelation(cteName, Seq.empty, true, u)
+          case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(columnNames, 
u: Union)) =>
+            insertRecursiveRelation(cteName, columnNames, false, u)
+          case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(columnNames, 
Distinct(u: Union))) =>
+            insertRecursiveRelation(cteName, columnNames, true, u)
+          case _ =>
+            throw new AnalysisException(s"Recursive query ${cteName} should 
contain UNION or " +
+              s"UNION ALL statements only. This error can also be caused by 
ORDER BY or LIMIT " +
+              s"keywords used on result of UNION or UNION ALL.")
+        }
+      } else {
+        ctePlan
+      }
+    } else {
+      ctePlan
+    }
+  }
+
+  /**
+   * If we encounter a relation that matches the recursive CTE then the 
relation is replaced to an
+   * [[UnresolvedRecursiveReference]]. The replacement process also checks 
possible references in
+   * subqueries and report them as errors.
+   */
+  private def insertRecursiveReferences(
+      ctePlan: LogicalPlan,
+      cteName: String): (LogicalPlan, Int) = {
+    var recursiveReferenceCount = 0
+    val resolver = ctePlan.conf.resolver
+    val newPlan = ctePlan resolveOperators {
+      case UnresolvedRelation(Seq(table)) if (ctePlan.conf.resolver(cteName, 
table)) =>
+        recursiveReferenceCount += 1
+        UnresolvedRecursiveReference(cteName, false)
+
+      case other =>
+        other.subqueries.foreach(checkAndTraverse(_, {
+          case UnresolvedRelation(Seq(name)) if (resolver(cteName, name)) =>
+            throw new AnalysisException(s"Recursive query ${cteName} should 
not contain " +
+              "recursive references in its subquery.")
+          case _ => true
+        }))
+        other
+    }
+
+    (newPlan, recursiveReferenceCount)
+  }
+
+  private def insertRecursiveRelation(
+      cteName: String,
+      columnNames: Seq[String],
+      distinct: Boolean,
+      union: Union) = {
+    if (union.children.size != 2) {
+      throw new AnalysisException(s"Recursive query ${cteName} should contain 
one anchor term " +
+        s"and one recursive term connected with UNION or UNION ALL.")
 
 Review comment:
   ```suggestion
           "and one recursive term connected with UNION or UNION ALL.")
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to