cloud-fan commented on code in PR #43465:
URL: https://github.com/apache/spark/pull/43465#discussion_r1366912425


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##########
@@ -872,6 +872,68 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper with PrivateM
     assert(joined2.schema.catalogString === "struct<id:bigint,a:double>")
   }
 
+  test("SPARK-45509: ambiguous column reference") {
+    val session = spark
+    import session.implicits._
+    val df1 = Seq(1 -> "a").toDF("i", "j")
+    val df1_filter = df1.filter(df1("i") > 0)
+    val df2 = Seq(2 -> "b").toDF("i", "y")
+
+    checkSameResult(
+      Seq(Row(1)),
+      // df1("i") is not ambiguous, and it's still valid in the filtered df.
+      df1_filter.select(df1("i"))
+    )
+
+    val e1 = intercept[AnalysisException] {
+      // df1("i") is not ambiguous, but it's not valid in the projected df.
+      df1.select((df1("i") + 1).as("plus")).select(df1("i")).collect()
+    }
+    
assert(e1.getMessage.contains("MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT"))
+
+    checkSameResult(
+      Seq(Row(1, "a")),
+      // All these column references are not ambiguous and are still valid 
after join.
+      df1.join(df2, df1("i") + 1 === 
df2("i")).sort(df1("i").desc).select(df1("i"), df1("j"))
+    )
+
+    val e2 = intercept[AnalysisException] {
+      // df1("i") is ambiguous as df1 appears in both join sides.
+      df1.join(df1, df1("i") === 1).collect()
+    }
+    assert(e2.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))
+
+    val e3 = intercept[AnalysisException] {
+      // df1("i") is ambiguous as df1 appears in both join sides.
+      df1.join(df1).select(df1("i")).collect()
+    }
+    assert(e3.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))
+
+    val e4 = intercept[AnalysisException] {
+      // df1("i") is ambiguous as df1 appears in both join sides (df1_filter 
contains df1).
+      df1.join(df1_filter, df1("i") === 1).collect()
+    }
+    assert(e4.getMessage.contains("AMBIGUOUS_COLUMN_REFERENCE"))
+
+    checkSameResult(
+      Seq(Row("a")),
+      // df1_filter("i") is not ambiguous as df1_filter does not exist in the 
join left side.
+      df1.join(df1_filter, df1_filter("i") === 1).select(df1_filter("j"))

Review Comment:
   Classic Spark SQL thinks this is ambiguous, as it uses `AttributeReference` 
directly and we are not able to re-resolve it. Spark Connect uses 
`UnresolvedAttribute` which is lazy binding and works fine in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to