mgaido91 commented on a change in pull request #23531: [SPARK-24497][SQL] 
Support recursive SQL query
URL: https://github.com/apache/spark/pull/23531#discussion_r251482557
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ##########
 @@ -47,6 +48,58 @@ case class Subquery(child: LogicalPlan) extends 
OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = child.output
 }
 
+/**
+ * This node defines a table that contains one ore more 
[[RecursiveReference]]s as child nodes
+ * referring to this table. It can be used to define a recursive CTE query and 
contains an anchor
+ * and a recursive term as children. The result of the anchor and the 
repeatedly executed recursive
+ * term are combined to form the final result.
+ *
+ * @param name name of the table
+ * @param anchorTerm this child is used for initializing the query
+ * @param recursiveTerm this child is used for extending the set of results 
with new rows based on
+ *                      the results of the previous iteration (or the anchor 
in the first iteration)
+ */
+case class RecursiveTable(
+    name: String,
+    anchorTerm: LogicalPlan,
+    recursiveTerm: LogicalPlan,
+    limit: Option[Long]) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(anchorTerm, recursiveTerm)
+
+  override def output: Seq[Attribute] = 
anchorTerm.output.map(_.withNullability(true))
+
+  override lazy val resolved: Boolean = {
+    val numberOfOutputMatches =
+      childrenResolved &&
+      anchorTerm.output.length > 0 &&
+      anchorTerm.output.length == recursiveTerm.output.length
+    if (numberOfOutputMatches) {
+      val typeOfOutputMatches = 
anchorTerm.output.zip(recursiveTerm.output).forall {
+        case (l, r) => l.dataType.sameType(r.dataType)
+      }
+      if (!typeOfOutputMatches) {
+        throw new AnalysisException(s"Anchor term types 
${anchorTerm.output.map(_.dataType)} " +
+          s"and recursive term types ${recursiveTerm.output.map(_.dataType)} 
doesn't match")
+      }
+    }
+    numberOfOutputMatches
+  }
+
+  lazy val anchorResolved = anchorTerm.resolved
+}
+
+/**
+ * A This node means a reference to a recursive table in CTE definitions.
+ *
+ * @param name the name of the table it references to
+ * @param output the attributes of the recursive table
+ */
+case class RecursiveReference(name: String, output: Seq[Attribute]) extends 
LeafNode {
+  override lazy val resolved = output.forall(_.resolved)
+
+  override def computeStats(): Statistics = Statistics(0)
 
 Review comment:
   No, I am not referring to this. In the other comment I mean that this PR is 
heavily revisiting the lifecycle and handling of broadcasts, which is a very 
sensitive topic. It'd be great if we could avoid somehow doing that, but I see 
it is not easy. Just wondering i we can figure out a way which introduces less 
changes there.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to