mgaido91 commented on a change in pull request #23057: [SPARK-26078][SQL] Dedup 
self-join attributes on IN subqueries
URL: https://github.com/apache/spark/pull/23057#discussion_r242079297
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 ##########
 @@ -43,31 +43,53 @@ import org.apache.spark.sql.types._
  *    condition.
  */
 object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper 
{
-  private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
+
+  private def buildJoin(
+      outerPlan: LogicalPlan,
+      subplan: LogicalPlan,
+      joinType: JoinType,
+      condition: Option[Expression]): Join = {
+    // Deduplicate conflicting attributes if any.
+    val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None, 
condition)
+    Join(outerPlan, dedupSubplan, joinType, condition)
+  }
+
+  private def dedupSubqueryOnSelfJoin(
+      outerPlan: LogicalPlan,
+      subplan: LogicalPlan,
+      valuesOpt: Option[Seq[Expression]],
+      condition: Option[Expression] = None): LogicalPlan = {
     // SPARK-21835: It is possibly that the two sides of the join have 
conflicting attributes,
     // the produced join then becomes unresolved and break structural 
integrity. We should
-    // de-duplicate conflicting attributes. We don't use transformation here 
because we only
-    // care about the most top join converted from correlated predicate 
subquery.
-    case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | 
ExistenceJoin(_)), joinCond) =>
-      val duplicates = right.outputSet.intersect(left.outputSet)
-      if (duplicates.nonEmpty) {
-        val aliasMap = AttributeMap(duplicates.map { dup =>
-          dup -> Alias(dup, dup.toString)()
-        }.toSeq)
-        val aliasedExpressions = right.output.map { ref =>
-          aliasMap.getOrElse(ref, ref)
-        }
-        val newRight = Project(aliasedExpressions, right)
-        val newJoinCond = joinCond.map { condExpr =>
-          condExpr transform {
-            case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
+    // de-duplicate conflicting attributes.
+    // SPARK-26078: it may also happen that the subquery has conflicting 
attributes with the outer
+    // values. In this case, the resulting join would contain trivially true 
conditions (eg.
+    // id#3 = id#3) which cannot be de-duplicated after. In this method, if 
there are conflicting
+    // attributes in the join condition, the subquery's conflicting attributes 
are changed using
+    // a projection which aliases them and resolves the problem.
+    val outerReferences = valuesOpt.map(values =>
+      
AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty)
+    val outerRefs = outerPlan.outputSet ++ outerReferences
+    val duplicates = outerRefs.intersect(subplan.outputSet)
+    if (duplicates.nonEmpty) {
+      condition.foreach { e =>
+          val conflictingAttrs = e.references.intersect(duplicates)
+          if (conflictingAttrs.nonEmpty) {
+            throw new AnalysisException("Found conflicting attributes " +
 
 Review comment:
   this can happen in case the condition is built in advance (eg. the 
correlated condition of exists) and it contains some attribute which is not 
dedup. I am not sure if this scenario can actually happen or our dedup logic in 
the previous rules guarantees this will never happen, though.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to