peter-toth opened a new pull request, #45552: URL: https://github.com/apache/spark/pull/45552
### What changes were proposed in this pull request? There seems to be a regression from Spark 3.5 to 4.0 caused by https://issues.apache.org/jira/browse/SPARK-43838 / https://github.com/apache/spark/pull/41347 as the following code no longer succeed: ``` val df = Seq((1, 2)).toDF("a", "b") val df2 = df.select(df("a").as("aa"), df("b").as("bb")) val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a")) df3.show() ``` Please note that if we dig deeper then it turns out that if we omit the extra project from `df` then the following is actually a bug that came in with the very first version of `DeduplicateRelations`, that deduplicated `MultiInstanceRelation`s only: ``` val schema = StructType.fromDDL("a int, b int") val rows = Seq(Row(1, 2)) val rdd = sparkContext.parallelize(rows) val df = spark.createDataFrame(rdd, schema) val df2 = df.select(df("a").as("aa"), df("b").as("bb")) val df3 = df2.join(df, df2("bb") === df("b")).select(df2("aa"), df("a")) df3.show() ``` The root cause seems to be `DeduplicateRelations` as it changes `df("a")` (`a#7`) comming from the right side when it runs on the join: ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations === !'Join Inner, '`=`(bb#12, b#8) 'Join Inner, '`=`(bb#12, b#18) :- Project [a#7 AS aa#11, b#8 AS bb#12] :- Project [a#7 AS aa#11, b#8 AS bb#12] : +- Project [_1#2 AS a#7, _2#3 AS b#8] : +- Project [_1#2 AS a#7, _2#3 AS b#8] : +- LocalRelation [_1#2, _2#3] : +- LocalRelation [_1#2, _2#3] !+- Project [_1#2 AS a#7, _2#3 AS b#8] +- Project [_1#15 AS a#17, _2#16 AS b#18] ! +- LocalRelation [_1#2, _2#3] +- LocalRelation [_1#15, _2#16] ``` and then when the `.select()` API adds a `Project` node containing `df("a")` above the join, it can't be resolved. This is because `DeduplicateRelations` always keeps the attributes of the first occurance of a node (`Project [_1#2 AS a#7, _2#3 AS b#8]` in this case) and creates new instances for other occurances. The rule doesn't (and can't) take into account if a top level attribute can actually come from a node or not. If `spark.sql.analyzer.failAmbiguousSelfJoin` is enabled then the `DetectAmbiguousSelfJoin` catches the issue as ``` Column a#7 are ambiguous. It's probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check. ``` If it is not enabled then `a#7` can't be resolved error is thrown. To solve the above regression this PR: - Assigns `LogicalPlan.PLAN_ID_TAG`s to logical plans of `Dataset`s that doesn't have any id. (Connect planner already does this.) - Changes resolved `AttributeReference`s to `UnresolvedAttribute`s in certain `Dataset` APIs if an attribute doesn't seem valid based on the output of the underlying logical plan. The `UnresolvedAttribute`s get the necessary tags to get resolved by the `ResolveReferences` rule (`ColumnResolutionHelper.tryResolveDataFrameColumns()`) later. ### Why are the changes needed? To fix the regression. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the regression/ ### How was this patch tested? New and existing UTs. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
