dtenedor commented on code in PR #49518:
URL: https://github.com/apache/spark/pull/49518#discussion_r1924249528
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -183,4 +184,52 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
columnNames.map(UnresolvedSubqueryColumnAliases(_, ref)).getOrElse(ref)
}
}
+
+ /**
+ * Checks if data types of anchor and recursive terms of a recursive CTE
definition match.
+ */
+ def checkDataTypesAnchorAndRecursiveTerm(unionLoop: UnionLoop): Unit = {
+ val anchorOutputDatatypes = unionLoop.anchor.output.map(_.dataType)
+ val recursiveTermOutputDatatypes =
unionLoop.recursion.output.map(_.dataType)
+
+ if (!anchorOutputDatatypes.zip(recursiveTermOutputDatatypes).forall {
+ case (anchorDT, recursionDT) => DataType.equalsStructurally(anchorDT,
recursionDT, true)
Review Comment:
it's a bit confusing to have the inline pattern-match within an `if` like
this, let's pull it out into a helper method
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveWithCTE.scala:
##########
@@ -183,4 +184,52 @@ object ResolveWithCTE extends Rule[LogicalPlan] {
columnNames.map(UnresolvedSubqueryColumnAliases(_, ref)).getOrElse(ref)
}
}
+
+ /**
+ * Checks if data types of anchor and recursive terms of a recursive CTE
definition match.
+ */
+ def checkDataTypesAnchorAndRecursiveTerm(unionLoop: UnionLoop): Unit = {
+ val anchorOutputDatatypes = unionLoop.anchor.output.map(_.dataType)
+ val recursiveTermOutputDatatypes =
unionLoop.recursion.output.map(_.dataType)
+
+ if (!anchorOutputDatatypes.zip(recursiveTermOutputDatatypes).forall {
+ case (anchorDT, recursionDT) => DataType.equalsStructurally(anchorDT,
recursionDT, true)
+ }) {
+ throw new AnalysisException(
+ errorClass = "INVALID_RECURSIVE_REFERENCE.DATA_TYPE",
+ messageParameters = Map.empty)
+ }
+ }
+
+ /**
+ * Throws error if self-reference is placed in places which are not allowed:
+ * right side of left outer/semi/anti joins, left side of right outer joins,
+ * in full outer joins and in aggregates
+ */
+ def checkIfSelfReferenceIsPlacedCorrectly(unionLoop: UnionLoop): Unit = {
+ def unionLoopRefNotAllowedUnderCurrentNode(currentNode: LogicalPlan) :
Unit =
+ currentNode.foreach {
+ case UnionLoopRef(unionLoop.id, _, _) =>
+ throw new AnalysisException(
+ errorClass = "INVALID_RECURSIVE_REFERENCE.PLACE",
+ messageParameters = Map.empty)
+ case other => ()
+ }
+ unionLoop.foreach {
+ case Join(left, right, LeftOuter, _, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(right)
+ case Join(left, right, RightOuter, _, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(left)
+ case Join(left, right, LeftSemi, _, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(right)
+ case Join(left, right, LeftAnti, _, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(right)
+ case Join(left, right, _, _, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(left)
+ unionLoopRefNotAllowedUnderCurrentNode(right)
+ case Aggregate(_, _, child, _) =>
+ unionLoopRefNotAllowedUnderCurrentNode(child)
+ case other => ()
Review Comment:
you don't need the `()` since the function returns `Unit`, you can just
remove it. Same elsewhere
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala:
##########
@@ -423,4 +429,20 @@ object CTESubstitution extends Rule[LogicalPlan] {
case _ => WithCTE(p, cteDefs)
}
}
+
+ /**
+ * Counts number of self-references in a recursive CTE definition and throws
an error
+ * if that number is bigger than 1.
+ */
+ private def checkNumberOfSelfReferences(cteDef: CTERelationDef): Unit = {
+ val numOfSelfRef = cteDef.map[Boolean] {
+ case CTERelationRef(cteDef.id, _, _, _, _, true) => true
+ case other => false
+ }.count(_ == true)
Review Comment:
I think you can drop the `(_ == true)` or the `== true` and it will have the
same result with simpler code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]