cloud-fan commented on code in PR #49232:
URL: https://github.com/apache/spark/pull/49232#discussion_r1898301531
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala:
##########
@@ -247,58 +260,130 @@ object CTESubstitution extends Rule[LogicalPlan] {
// NOTE: we must call `traverseAndSubstituteCTE` before
`substituteCTE`, as the relations
// in the inner CTE have higher priority over the relations in the
outer CTE when resolving
// inner CTE relations. For example:
- // WITH t1 AS (SELECT 1)
- // t2 AS (
- // WITH t1 AS (SELECT 2)
- // WITH t3 AS (SELECT * FROM t1)
- // )
- // t3 should resolve the t1 to `SELECT 2` instead of `SELECT 1`.
- traverseAndSubstituteCTE(relation, forceInline, resolvedCTERelations,
cteDefs)._1
+ // WITH
+ // t1 AS (SELECT 1),
+ // t2 AS (
+ // WITH
+ // t1 AS (SELECT 2),
+ // t3 AS (SELECT * FROM t1)
+ // SELECT * FROM t1
+ // )
+ // SELECT * FROM t2
+ // t3 should resolve the t1 to `SELECT 2` ("inner" t1) instead of
`SELECT 1`.
+ //
+ // When recursion allowed (RECURSIVE keyword used):
+ // Consider following example:
+ // WITH
+ // t1 AS (SELECT 1),
+ // t2 AS (
+ // WITH RECURSIVE
+ // t1 AS (
+ // SELECT 1 AS level
+ // UNION (
+ // WITH t3 AS (SELECT level + 1 FROM t1 WHERE level < 10)
+ // SELECT * FROM t3
+ // )
+ // )
+ // SELECT * FROM t1
+ // )
+ // SELECT * FROM t2
+ // t1 reference within t3 would initially resolve to outer `t1`
(SELECT 1), as the inner t1
+ // is not yet known. Therefore, we need to remove definitions that
conflict with current
+ // relation `name` from the list of `outerCTEDefs` entering
`traverseAndSubstituteCTE()`.
+ // NOTE: It will be recognized later in the code that this is actually
a self-reference
+ // (reference to the inner t1).
+ val nonConflictingCTERelations = if (allowRecursion) {
+ resolvedCTERelations.filterNot {
+ case (cteName, cteDef) => cteDef.conf.resolver(cteName, name)
+ }
+ } else {
+ resolvedCTERelations
+ }
+ traverseAndSubstituteCTE(relation, forceInline,
nonConflictingCTERelations, cteDefs)._1
}
- // CTE definition can reference a previous one
- val substituted = substituteCTE(innerCTEResolved, alwaysInline,
resolvedCTERelations)
- val cteRelation = CTERelationDef(substituted)
+
+ // If recursion is allowed (RECURSIVE keyword specified)
+ // then it has higher priority than outer or previous relations so
+ // construct a not yet substituted but recursive `CTERelationDef`, that
we will prepend to
+ // `resolvedCTERelations`. Prepending is done within `substituteCTE`
method.
+ val recursiveCTERelation = if (allowRecursion) {
+ Some(name -> CTERelationDef(relation, recursive = true))
+ } else {
+ None
+ }
+ // CTE definition can reference a previous one or itself if recursion
allowed.
+ val (substituted, recursionFound) = substituteCTE(innerCTEResolved,
alwaysInline,
+ resolvedCTERelations, recursiveCTERelation)
+ val cteRelation = recursiveCTERelation
+ .map(_._2.copy(child = substituted, recursive = recursionFound))
+ .getOrElse(CTERelationDef(substituted))
if (!alwaysInline) {
cteDefs += cteRelation
}
+
+ // From this point any reference to the definition is non-recursive.
+ val nonRecursiveCTERelation = if (cteRelation.recursive) {
+ cteRelation.copy(recursive = false)
+ } else {
+ cteRelation
+ }
+
// Prepending new CTEs makes sure that those have higher priority over
outer ones.
- resolvedCTERelations +:= (name -> cteRelation)
+ resolvedCTERelations +:= (name -> nonRecursiveCTERelation)
}
resolvedCTERelations
}
+ /**
+ * @return tuple of the logical plan and recursionFound boolean.
+ */
private def resolveWithCTERelations(
table: String,
alwaysInline: Boolean,
cteRelations: Seq[(String, CTERelationDef)],
- unresolvedRelation: UnresolvedRelation): LogicalPlan = {
- cteRelations
+ unresolvedRelation: UnresolvedRelation): (LogicalPlan, Boolean) = {
+ var recursionFound = false
+ val cteRelationsOut = cteRelations
.find(r => conf.resolver(r._1, table))
.map {
case (_, d) =>
if (alwaysInline) {
d.child
} else {
+ if (d.recursive) {
+ // self-reference is found
+ recursionFound = true
+ }
// Add a `SubqueryAlias` for hint-resolving rules to match
relation names.
- SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output,
d.isStreaming))
+ SubqueryAlias(table,
+ CTERelationRef(d.id, d.resolved, d.output, d.isStreaming,
recursive = d.recursive))
}
}
.getOrElse(unresolvedRelation)
+ (cteRelationsOut, recursionFound)
}
+ /**
+ * @return tuple of the logical plan and recursionFound boolean.
+ */
private def substituteCTE(
plan: LogicalPlan,
alwaysInline: Boolean,
- cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
- plan.resolveOperatorsUpWithPruning(
+ cteRelations: Seq[(String, CTERelationDef)],
+ recursiveCTERelation: Option[(String, CTERelationDef)]): (LogicalPlan,
Boolean) = {
+ var recursionFound = false
+ val substituted = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION,
PLAN_EXPRESSION,
UNRESOLVED_IDENTIFIER)) {
case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
if cteRelations.exists(r => plan.conf.resolver(r._1, table)) =>
throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table))
case u @ UnresolvedRelation(Seq(table), _, _) =>
- resolveWithCTERelations(table, alwaysInline, cteRelations, u)
+ val resolved = resolveWithCTERelations(table, alwaysInline,
+ (recursiveCTERelation ++ cteRelations).toSeq, u)
Review Comment:
why not also make `resolveWithCTERelations` to take an optional parameter
`recursiveCTERelation: Option[(String, CTERelationDef)]` like this function
`substituteCTE` does? Then `CTERelationDef#recursive` is not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]