mgaido91 commented on a change in pull request #23057: [SPARK-26078][SQL] Dedup
self-join attributes on IN subqueries
URL: https://github.com/apache/spark/pull/23057#discussion_r242079297
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
##########
@@ -43,31 +43,53 @@ import org.apache.spark.sql.types._
* condition.
*/
object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper
{
- private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
+
+ private def buildJoin(
+ outerPlan: LogicalPlan,
+ subplan: LogicalPlan,
+ joinType: JoinType,
+ condition: Option[Expression]): Join = {
+ // Deduplicate conflicting attributes if any.
+ val dedupSubplan = dedupSubqueryOnSelfJoin(outerPlan, subplan, None,
condition)
+ Join(outerPlan, dedupSubplan, joinType, condition)
+ }
+
+ private def dedupSubqueryOnSelfJoin(
+ outerPlan: LogicalPlan,
+ subplan: LogicalPlan,
+ valuesOpt: Option[Seq[Expression]],
+ condition: Option[Expression] = None): LogicalPlan = {
// SPARK-21835: It is possibly that the two sides of the join have
conflicting attributes,
// the produced join then becomes unresolved and break structural
integrity. We should
- // de-duplicate conflicting attributes. We don't use transformation here
because we only
- // care about the most top join converted from correlated predicate
subquery.
- case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti |
ExistenceJoin(_)), joinCond) =>
- val duplicates = right.outputSet.intersect(left.outputSet)
- if (duplicates.nonEmpty) {
- val aliasMap = AttributeMap(duplicates.map { dup =>
- dup -> Alias(dup, dup.toString)()
- }.toSeq)
- val aliasedExpressions = right.output.map { ref =>
- aliasMap.getOrElse(ref, ref)
- }
- val newRight = Project(aliasedExpressions, right)
- val newJoinCond = joinCond.map { condExpr =>
- condExpr transform {
- case a: Attribute => aliasMap.getOrElse(a, a).toAttribute
+ // de-duplicate conflicting attributes.
+ // SPARK-26078: it may also happen that the subquery has conflicting
attributes with the outer
+ // values. In this case, the resulting join would contain trivially true
conditions (eg.
+ // id#3 = id#3) which cannot be de-duplicated after. In this method, if
there are conflicting
+ // attributes in the join condition, the subquery's conflicting attributes
are changed using
+ // a projection which aliases them and resolves the problem.
+ val outerReferences = valuesOpt.map(values =>
+
AttributeSet.fromAttributeSets(values.map(_.references))).getOrElse(AttributeSet.empty)
+ val outerRefs = outerPlan.outputSet ++ outerReferences
+ val duplicates = outerRefs.intersect(subplan.outputSet)
+ if (duplicates.nonEmpty) {
+ condition.foreach { e =>
+ val conflictingAttrs = e.references.intersect(duplicates)
+ if (conflictingAttrs.nonEmpty) {
+ throw new AnalysisException("Found conflicting attributes " +
Review comment:
this can happen in case the condition is built in advance (eg. the
correlated condition of exists) and it contains some attribute which is not
dedup. I am not sure if this scenario can actually happen or our dedup logic in
the previous rules guarantees this will never happen, though.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]