YannisSismanis commented on code in PR #39759:
URL: https://github.com/apache/spark/pull/39759#discussion_r1107580340


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -254,24 +255,104 @@ object DecorrelateInnerQuery extends PredicateHelper {
   /**
    * Rewrites a domain join cond so that it can be pushed to the right side of 
a
    * union/intersect/except operator.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   union all
+   *   select b from t2 where c < t0.y)
+   *
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery

Review Comment:
   nit: mention that x#1, y#2 are from t0?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -254,24 +255,104 @@ object DecorrelateInnerQuery extends PredicateHelper {
   /**
    * Rewrites a domain join cond so that it can be pushed to the right side of 
a
    * union/intersect/except operator.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   union all
+   *   select b from t2 where c < t0.y)
+   *
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery
+   * is a Union where the left side has output [a#3, x#4, y#5] with DomainJoin 
[x#4, y#5]
+   * and the right side has output [b#6, x#7, y#8] with DomainJoin [x#7, y#8].
+   * The domain join conditions are x#4 <=> x#1 and y#5 <=> y#2, because the 
output of a set op
+   * uses the attribute names from the left side. For the left side,
+   * those remain unchanged, while for the right side they are remapped to
+   * x#7 <=> x#1 and y#8 <=> y#2.
    */
-  def pushConditionsThroughUnion(
+  def pushDomainConditionsThroughSetOperation(
       conditions: Seq[Expression],
-      union: Union,
+      setOp: LogicalPlan, // Union or SetOperation
       child: LogicalPlan): Seq[Expression] = {
     // The output attributes are always equal to the left child's output
-    assert(union.output.size == child.output.size)
-    val map = AttributeMap(union.output.zip(child.output))
-    conditions.map {
+    assert(setOp.output.size == child.output.size)
+    val map = AttributeMap(setOp.output.zip(child.output))
+    conditions.collect {
       // The left hand side is the domain attribute used in the inner query 
and the right hand side
       // is the attribute from the outer query. (See comment above in 
buildDomainAttrMap.)
       // We need to remap the attribute names used in the inner query (left 
hand side) to account
       // for the different names in each union child. We should not remap the 
attribute names used
       // in the outer query.
+      //
+      // Note: the reason we can't just use the original joinCond from when 
the DomainJoin was
+      // constructed is that constructing the DomainJoins happens much earlier 
than rewriting the
+      // DomainJoins into actual joins, with many optimization steps in
+      // between, which could change the attributes involved (e.g. 
CollapseProject).
       case EqualNullSafe(left: Attribute, right: Expression) =>
         EqualNullSafe(map.getOrElse(left, left), right)
-      case EqualTo(left: Attribute, right: Expression) =>
-        EqualTo(map.getOrElse(left, left), right)
+    }
+  }
+
+  /**
+   * This is to handle INTERSECT/EXCEPT DISTINCT which are rewritten to left 
semi/anti join in
+   * ReplaceIntersectWithSemiJoin and ReplaceExceptWithAntiJoin.
+   *
+   * To rewrite the domain join on the right side, we need to remap the 
attributes in the domain
+   * join cond, using the mapping between left and right sides in the 
semi/anti join cond.
+   *
+   * After DecorrelateInnerQuery, the domain join conds reference the output 
names of the
+   * INTERSECT/EXCEPT, which come from the left side. When rewriting the 
DomainJoin in the
+   * right child, we need to remap the domain attribute names to account for 
the different
+   * names in the left vs right child, similar to 
pushDomainConditionsThroughSetOperation.
+   * But after the rewrite to semi/anti join is performed, we instead need to 
do the remapping
+   * based on the semi/anti join cond which contains equi-joins between the 
left and right
+   * outputs.
+   *
+   * The domain conds will be like leftInner <=> outer, and the semi/anti join 
cond will be like
+   * leftInner <=> rightInner. We add additional domain conds rightInner <=> 
outer which are used
+   * to rewrite the right-side DomainJoin.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < x
+   *   intersect distinct
+   *   select b from t2 where c < y)
+   *
+   * Step 1:
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery

Review Comment:
   nit: may be point out explictly from what table (like t0, t1 ,t2) the 
attributes are coming from. Just to make the comment a bit easier to read.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -254,24 +255,104 @@ object DecorrelateInnerQuery extends PredicateHelper {
   /**
    * Rewrites a domain join cond so that it can be pushed to the right side of 
a
    * union/intersect/except operator.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   union all
+   *   select b from t2 where c < t0.y)
+   *
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery
+   * is a Union where the left side has output [a#3, x#4, y#5] with DomainJoin 
[x#4, y#5]

Review Comment:
   I assume x#4, y#5 are from t1? similar below x#7, y#8 are from t2?
   
   if yes why is there a domainjoin [x#4, y#5] for t1? I thought only the 
references to the outer are needed (i.e. I expected to see just x#4 for t1). 
Similar for t2.
   
   Is this because of the set op (like union all in this case) and 
PullupCorrelatedPredicates? I cannot immediately see where the domainjoin uses 
both legs of the union.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -254,24 +255,104 @@ object DecorrelateInnerQuery extends PredicateHelper {
   /**
    * Rewrites a domain join cond so that it can be pushed to the right side of 
a
    * union/intersect/except operator.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < t0.x
+   *   union all
+   *   select b from t2 where c < t0.y)
+   *
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery
+   * is a Union where the left side has output [a#3, x#4, y#5] with DomainJoin 
[x#4, y#5]
+   * and the right side has output [b#6, x#7, y#8] with DomainJoin [x#7, y#8].
+   * The domain join conditions are x#4 <=> x#1 and y#5 <=> y#2, because the 
output of a set op
+   * uses the attribute names from the left side. For the left side,
+   * those remain unchanged, while for the right side they are remapped to
+   * x#7 <=> x#1 and y#8 <=> y#2.
    */
-  def pushConditionsThroughUnion(
+  def pushDomainConditionsThroughSetOperation(
       conditions: Seq[Expression],
-      union: Union,
+      setOp: LogicalPlan, // Union or SetOperation
       child: LogicalPlan): Seq[Expression] = {
     // The output attributes are always equal to the left child's output
-    assert(union.output.size == child.output.size)
-    val map = AttributeMap(union.output.zip(child.output))
-    conditions.map {
+    assert(setOp.output.size == child.output.size)
+    val map = AttributeMap(setOp.output.zip(child.output))
+    conditions.collect {
       // The left hand side is the domain attribute used in the inner query 
and the right hand side
       // is the attribute from the outer query. (See comment above in 
buildDomainAttrMap.)
       // We need to remap the attribute names used in the inner query (left 
hand side) to account
       // for the different names in each union child. We should not remap the 
attribute names used
       // in the outer query.
+      //
+      // Note: the reason we can't just use the original joinCond from when 
the DomainJoin was
+      // constructed is that constructing the DomainJoins happens much earlier 
than rewriting the
+      // DomainJoins into actual joins, with many optimization steps in
+      // between, which could change the attributes involved (e.g. 
CollapseProject).
       case EqualNullSafe(left: Attribute, right: Expression) =>
         EqualNullSafe(map.getOrElse(left, left), right)
-      case EqualTo(left: Attribute, right: Expression) =>
-        EqualTo(map.getOrElse(left, left), right)
+    }
+  }
+
+  /**
+   * This is to handle INTERSECT/EXCEPT DISTINCT which are rewritten to left 
semi/anti join in
+   * ReplaceIntersectWithSemiJoin and ReplaceExceptWithAntiJoin.
+   *
+   * To rewrite the domain join on the right side, we need to remap the 
attributes in the domain
+   * join cond, using the mapping between left and right sides in the 
semi/anti join cond.
+   *
+   * After DecorrelateInnerQuery, the domain join conds reference the output 
names of the
+   * INTERSECT/EXCEPT, which come from the left side. When rewriting the 
DomainJoin in the
+   * right child, we need to remap the domain attribute names to account for 
the different
+   * names in the left vs right child, similar to 
pushDomainConditionsThroughSetOperation.
+   * But after the rewrite to semi/anti join is performed, we instead need to 
do the remapping
+   * based on the semi/anti join cond which contains equi-joins between the 
left and right
+   * outputs.
+   *
+   * The domain conds will be like leftInner <=> outer, and the semi/anti join 
cond will be like
+   * leftInner <=> rightInner. We add additional domain conds rightInner <=> 
outer which are used
+   * to rewrite the right-side DomainJoin.
+   *
+   * Example: Take a query like:
+   * select * from t0 join lateral (
+   *   select a from t1 where b < x
+   *   intersect distinct
+   *   select b from t2 where c < y)
+   *
+   * Step 1:
+   * We have outer table with attributes [x#1, y#2] and after introducing 
DomainJoins the subquery
+   * is a Intersect where the left side has output [a#3, x#4, y#5] with 
DomainJoin [x#4, y#5]
+   * and the right side has output [b#6, x#7, y#8] with DomainJoin [x#7, y#8].
+   * The domain join conditions are x#4 <=> x#1 and y#5 <=> y#2, because the 
output of a set op
+   * uses the attribute names from the left side.
+   *
+   * Step 2:
+   * ReplaceIntersectWithSemiJoin runs and transforms the Intersect to
+   * Join LeftSemi, (((a#3 <=> b#6) AND (x#4 <=> x#7)) AND (y#5 <=> y#8))

Review Comment:
   This join between t1,t2 happens for both intersect distinct and intersect 
all? It seems so, so maybe update the comment? 
   
   also mention that Intersect(isall=?) will take care of distinct/all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to