cloud-fan commented on a change in pull request #33070:
URL: https://github.com/apache/spark/pull/33070#discussion_r659934143



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
##########
@@ -212,14 +214,38 @@ object DecorrelateInnerQuery extends PredicateHelper {
   }
 
   /**
-   * Rewrite all [[DomainJoin]]s in the inner query to actual inner joins with 
the outer query.
+   * Rewrite all [[DomainJoin]]s in the inner query to actual joins with the 
outer query.
    */
   def rewriteDomainJoins(
       outerPlan: LogicalPlan,
       innerPlan: LogicalPlan,
       conditions: Seq[Expression]): LogicalPlan = innerPlan match {
-    case d @ DomainJoin(domainAttrs, child) =>
+    case d @ DomainJoin(domainAttrs, child, joinType, condition) =>
       val domainAttrMap = buildDomainAttrMap(conditions, domainAttrs)
+
+      val newChild = joinType match {
+        // Left outer domain joins are used to handle the COUNT bug.
+        case LeftOuter =>
+          // Replace the attributes in the domain join condition with the 
actual outer expressions
+          // and use the new join conditions to rewrite domain joins in its 
child. For example:
+          // DomainJoin [c'] LeftOuter (a = c') with domainAttrMap: { c' -> _1 
}.
+          // Then the new conditions to use will be [(a = _1)].
+          val newConditions = condition.map(
+            _.transform { case a: Attribute => domainAttrMap.getOrElse(a, a)}
+          ).map(splitConjunctivePredicates).getOrElse(conditions)
+          // Recursively rewrite domain joins using the new conditions.
+          rewriteDomainJoins(outerPlan, child, newConditions)
+        case Inner =>
+          // The decorrelation framework adds domain inner joins by traversing 
down the plan tree
+          // recursively until it reaches a node that is not correlated with 
the outer query.
+          // So the child node of a domain inner join shouldn't contain 
another domain join.
+          assert(child.find(_.isInstanceOf[DomainJoin]).isEmpty,
+            s"Child of a domain join shouldn't contain another domain 
join.\n$child")
+          child
+        case o =>
+          throw QueryExecutionErrors.unexpectedDomainJoinTypeError(o)

Review comment:
       This is not a user-facing bug, let's just throw IllegalStateException 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to