mgaido91 commented on a change in pull request #23531: [SPARK-24497][SQL]
Support recursive SQL query
URL: https://github.com/apache/spark/pull/23531#discussion_r321960214
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
##########
@@ -120,6 +125,124 @@ object CTESubstitution extends Rule[LogicalPlan] {
}
}
+ /**
+ * If recursion is allowed recursion handling starts with inserting
unresolved self-references
+ * ([[UnresolvedRecursiveReference]]) to places where a reference to the CTE
definition itself is
+ * found.
+ * If there is a self-reference then we need to check if structure of the
query satisfies the SQL
+ * recursion rules and insert the appropriate [[RecursiveRelation]] finally.
+ */
+ private def handleRecursion(
+ ctePlan: => LogicalPlan,
+ cteName: String,
+ allowRecursion: Boolean) = {
+ if (allowRecursion) {
+ // check if there is any reference to the CTE and if there is then treat
the CTE as recursive
+ val (recursiveReferencesPlan, recursiveReferenceCount) =
+ insertRecursiveReferences(ctePlan, cteName)
+ if (recursiveReferenceCount > 0) {
+ // if there is a reference then the CTE needs to follow one of these
structures
+ recursiveReferencesPlan match {
+ case SubqueryAlias(_, u: Union) =>
+ insertRecursiveRelation(cteName, Seq.empty, false, u)
+ case SubqueryAlias(_, Distinct(u: Union)) =>
+ insertRecursiveRelation(cteName, Seq.empty, true, u)
+ case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(columnNames,
u: Union)) =>
+ insertRecursiveRelation(cteName, columnNames, false, u)
+ case SubqueryAlias(_, UnresolvedSubqueryColumnAliases(columnNames,
Distinct(u: Union))) =>
+ insertRecursiveRelation(cteName, columnNames, true, u)
+ case _ =>
+ throw new AnalysisException(s"Recursive query ${cteName} should
contain UNION or " +
+ s"UNION ALL statements only. This error can also be caused by
ORDER BY or LIMIT " +
+ s"keywords used on result of UNION or UNION ALL.")
+ }
+ } else {
+ ctePlan
+ }
+ } else {
+ ctePlan
+ }
+ }
+
+ /**
+ * If we encounter a relation that matches the recursive CTE then the
relation is replaced to an
+ * [[UnresolvedRecursiveReference]]. The replacement process also checks
possible references in
+ * subqueries and report them as errors.
+ */
+ private def insertRecursiveReferences(
+ ctePlan: LogicalPlan,
+ cteName: String): (LogicalPlan, Int) = {
+ var recursiveReferenceCount = 0
+ val resolver = ctePlan.conf.resolver
+ val newPlan = ctePlan resolveOperators {
+ case UnresolvedRelation(Seq(table)) if (ctePlan.conf.resolver(cteName,
table)) =>
+ recursiveReferenceCount += 1
+ UnresolvedRecursiveReference(cteName, false)
+
+ case other =>
+ other.subqueries.foreach(checkAndTraverse(_, {
+ case UnresolvedRelation(Seq(name)) if (resolver(cteName, name)) =>
+ throw new AnalysisException(s"Recursive query ${cteName} should
not contain " +
+ "recursive references in its subquery.")
+ case _ => true
+ }))
+ other
+ }
+
+ (newPlan, recursiveReferenceCount)
+ }
+
+ private def insertRecursiveRelation(
+ cteName: String,
+ columnNames: Seq[String],
+ distinct: Boolean,
+ union: Union) = {
+ if (union.children.size != 2) {
+ throw new AnalysisException(s"Recursive query ${cteName} should contain
one anchor term " +
+ s"and one recursive term connected with UNION or UNION ALL.")
Review comment:
```suggestion
"and one recursive term connected with UNION or UNION ALL.")
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]