peter-toth commented on code in PR #46183:
URL: https://github.com/apache/spark/pull/46183#discussion_r1579492849
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala:
##########
@@ -38,28 +38,29 @@ case class RelationWrapper(cls: Class[_], outputAttrIds:
Seq[Long])
object DeduplicateRelations extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1
- if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) {
- // Wait for `ResolveMissingReferences` to resolve missing attributes
first
- return newPlan
- }
+
+ def noMissingInput(p: LogicalPlan) = !p.exists(_.missingInput.nonEmpty)
+
newPlan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT,
UNION, COMMAND),
ruleId) {
case p: LogicalPlan if !p.childrenResolved => p
// To resolve duplicate expression IDs for Join.
- case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
+ case j @ Join(left, right, _, _, _) if !j.duplicateResolved &&
noMissingInput(right) =>
j.copy(right = dedupRight(left, right))
// Resolve duplicate output for LateralJoin.
- case j @ LateralJoin(left, right, _, _) if right.resolved &&
!j.duplicateResolved =>
+ case j @ LateralJoin(left, right, _, _)
+ if right.resolved && !j.duplicateResolved &&
noMissingInput(right.plan) =>
j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
// Resolve duplicate output for AsOfJoin.
- case j @ AsOfJoin(left, right, _, _, _, _, _) if !j.duplicateResolved =>
+ case j @ AsOfJoin(left, right, _, _, _, _, _)
+ if !j.duplicateResolved && noMissingInput(right) =>
j.copy(right = dedupRight(left, right))
// intersect/except will be rewritten to join at the beginning of
optimizer. Here we need to
// deduplicate the right side plan, so that we won't produce an invalid
self-join later.
- case i @ Intersect(left, right, _) if !i.duplicateResolved =>
+ case i @ Intersect(left, right, _) if !i.duplicateResolved &&
noMissingInput(right) =>
i.copy(right = dedupRight(left, right))
- case e @ Except(left, right, _) if !e.duplicateResolved =>
+ case e @ Except(left, right, _) if !e.duplicateResolved &&
noMissingInput(right) =>
e.copy(right = dedupRight(left, right))
// Only after we finish by-name resolution for Union
case u: Union if !u.byName && !u.duplicateResolved =>
Review Comment:
This is because the `Union` deduplication is different to others. Here we
just insert additional `Project`s into the 2nd+ union legs, which doesn't alter
attributes in a leg under the inserted `Project`.
But in other cases deduplication is done by `dedupRight()`, which can alter
the attributes in a leg, posibly under a missing attribute...
Here is an `Union` deduplication where only a `Project [id#24 AS id#29]` is
inserted and that doesn't have any effect on the missing `name#25`:
```
=== Applying Rule
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
Distinct
Distinct
+- Union false, false
+- Union false, false
:- Project [id#24]
:- Project [id#24]
: +- SubqueryAlias l1
: +- SubqueryAlias l1
: +- SubqueryAlias spark_catalog.default.v1
: +- SubqueryAlias spark_catalog.default.v1
: +- View (`default`.`v1`, [id#24,name#25])
: +- View (`default`.`v1`, [id#24,name#25])
: +- Project [cast(id#20 as int) AS id#24, cast(name#21 as
string) AS name#25] : +- Project [cast(id#20 as int) AS id#24,
cast(name#21 as string) AS name#25]
: +- Project [id#20, name#21]
: +- Project [id#20, name#21]
: +- SubqueryAlias spark_catalog.default.t
: +- SubqueryAlias spark_catalog.default.t
: +- Relation default.t[id#20,name#21] parquet
: +- Relation default.t[id#20,name#21]
parquet
! +- !Filter (count(distinct tempresolvedcolumn(name#25, name)) > cast(1
as bigint)) +- Project [id#24 AS id#29]
! +- Aggregate [id#24], [id#24]
+- !Filter (count(distinct tempresolvedcolumn(name#25,
name)) > cast(1 as bigint))
! +- SubqueryAlias spark_catalog.default.v1
+- Aggregate [id#24], [id#24]
! +- View (`default`.`v1`, [id#24,name#25])
+- SubqueryAlias spark_catalog.default.v1
! +- Project [cast(id#26 as int) AS id#24, cast(name#27 as
string) AS name#25] +- View (`default`.`v1`, [id#24,name#25])
! +- Project [id#26, name#27]
+- Project [cast(id#26 as int) AS id#24,
cast(name#27 as string) AS name#25]
! +- SubqueryAlias spark_catalog.default.t
+- Project [id#26, name#27]
! +- Relation default.t[id#26,name#27] parquet
+- SubqueryAlias
spark_catalog.default.t
!
+- Relation
default.t[id#26,name#27] parquet
```
And here how a `Join` deduplication would look like without the `&&
noMissingInput(right)` guard. The missing `name#25` can no longer be resolved:
```
=== Applying Rule
org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
'Aggregate ['l1.name, 'l1.id], ['l1.id]
'Aggregate ['l1.name, 'l1.id], ['l1.id]
+- 'Join Inner, ('l1.id = 'l2.id)
+- 'Join Inner, ('l1.id = 'l2.id)
:- SubqueryAlias l1
:- SubqueryAlias l1
: +- SubqueryAlias spark_catalog.default.v1
: +- SubqueryAlias spark_catalog.default.v1
: +- View (`default`.`v1`, [id#24,name#25])
: +- View (`default`.`v1`, [id#24,name#25])
: +- Project [cast(id#20 as int) AS id#24, cast(name#21 as
string) AS name#25] : +- Project [cast(id#20 as int) AS
id#24, cast(name#21 as string) AS name#25]
: +- Project [id#20, name#21]
: +- Project [id#20, name#21]
: +- SubqueryAlias spark_catalog.default.t
: +- SubqueryAlias spark_catalog.default.t
: +- Relation default.t[id#20,name#21] parquet
: +- Relation default.t[id#20,name#21]
parquet
+- SubqueryAlias l2
+- SubqueryAlias l2
+- !Filter (count(distinct tempresolvedcolumn(name#25, name)) >
cast(1 as bigint)) +- !Filter (count(distinct
tempresolvedcolumn(name#25, name)) > cast(1 as bigint))
! +- Aggregate [id#24], [id#24]
+- Aggregate [id#29], [id#29]
+- SubqueryAlias spark_catalog.default.v1
+- SubqueryAlias spark_catalog.default.v1
! +- View (`default`.`v1`, [id#24,name#25])
+- View (`default`.`v1`, [id#29,name#30])
! +- Project [cast(id#26 as int) AS id#24, cast(name#27 as
string) AS name#25] +- Project [cast(id#26 as int) AS
id#29, cast(name#27 as string) AS name#30]
+- Project [id#26, name#27]
+- Project [id#26, name#27]
+- SubqueryAlias spark_catalog.default.t
+- SubqueryAlias
spark_catalog.default.t
+- Relation default.t[id#26,name#27] parquet
+- Relation
default.t[id#26,name#27] parquet
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]