allisonwang-db commented on code in PR #39375:
URL: https://github.com/apache/spark/pull/39375#discussion_r1084480287


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1056,6 +1056,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
       sub: LogicalPlan,
       isScalar: Boolean = false,
       isLateral: Boolean = false): Unit = {
+    // Some query shapes are only supported with the DecorrelateInnerQuery 
framework.
+    // Currently we only use this new framework for scalar and lateral 
subqueries.
+    val usingDecorrelateInnerQueryFramework =
+      (isScalar || isLateral) && SQLConf.get.decorrelateInnerQueryEnabled

Review Comment:
   We should change IN/EXISTS subqueries to use this and unify the 
decorrelation framework in the future.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3167,6 +3167,14 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val DECORRELATE_SET_OPS_ENABLED =
+    buildConf("spark.sql.optimizer.decorrelateSetOps.enabled")
+      .internal()
+      .doc("Decorrelate subqueries with correlation under UNIONs.")

Review Comment:
   ```suggestion
         .doc("Decorrelate subqueries with correlation under set operators.")
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1209,6 +1213,14 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
         case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _: 
SubqueryAlias) =>
           p.children.foreach(child => checkPlan(child, aggregated, 
canContainOuter))
 
+        case p @ (_ : Union) =>
+          // Set operations (e.g. UNION) containing correlated values are only 
supported
+          // with DecorrelateInnerQuery framework.
+          val childCanContainOuter = (canContainOuter
+            && usingDecorrelateInnerQueryFramework
+            && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED))

Review Comment:
   Thanks for adding this conf!



##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -938,6 +938,74 @@ class SubquerySuite extends QueryTest
     }
   }
 
+  test("SPARK-36124: Correlated subqueries with union") {
+    withTempView("t0", "t1", "t2") {
+      Seq((1, 1), (2, 0)).toDF("t0a", "t0b").createOrReplaceTempView("t0")
+      Seq((1, 1, 3)).toDF("t1a", "t1b", "t1c").createOrReplaceTempView("t1")
+      Seq((1, 1, 5), (2, 2, 7)).toDF("t2a", "t2b", 
"t2c").createOrReplaceTempView("t2")
+
+      // Union with different outer refs
+      val query =
+        """
+          | SELECT t0a, (SELECT sum(t1c) FROM
+          |   (SELECT t1c
+          |   FROM   t1
+          |   WHERE  t1a = t0a
+          |   UNION ALL
+          |   SELECT t2c
+          |   FROM   t2
+          |   WHERE  t2b = t0b)
+          | )
+          | FROM t0""".stripMargin
+
+      {
+        val df = sql(query)
+        checkAnswer(df,
+          Row(1, 8) :: Row(2, null) :: Nil)
+
+        val optimizedPlan = df.queryExecution.optimizedPlan
+        val aggregate = optimizedPlan.collectFirst { case a: Aggregate => a 
}.get
+        assert(aggregate.groupingExpressions.size == 2)
+        val union = optimizedPlan.collectFirst { case u: Union => u }.get
+        assert(union.output.size == 3)
+        assert(optimizedPlan.resolved)
+      }
+      withSQLConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED.key -> "false") {
+        val error = intercept[AnalysisException] { sql(query) }
+        assert(error.message.contains("Accessing outer query column is not 
allowed"))

Review Comment:
   Can we check the error class instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to