allisonwang-db commented on code in PR #39375:
URL: https://github.com/apache/spark/pull/39375#discussion_r1084480287
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1056,6 +1056,11 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
sub: LogicalPlan,
isScalar: Boolean = false,
isLateral: Boolean = false): Unit = {
+ // Some query shapes are only supported with the DecorrelateInnerQuery
framework.
+ // Currently we only use this new framework for scalar and lateral
subqueries.
+ val usingDecorrelateInnerQueryFramework =
+ (isScalar || isLateral) && SQLConf.get.decorrelateInnerQueryEnabled
Review Comment:
We should change IN/EXISTS subqueries to use this and unify the
decorrelation framework in the future.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3167,6 +3167,14 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val DECORRELATE_SET_OPS_ENABLED =
+ buildConf("spark.sql.optimizer.decorrelateSetOps.enabled")
+ .internal()
+ .doc("Decorrelate subqueries with correlation under UNIONs.")
Review Comment:
```suggestion
.doc("Decorrelate subqueries with correlation under set operators.")
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -1209,6 +1213,14 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
case p @ (_: ResolvedHint | _: LeafNode | _: Repartition | _:
SubqueryAlias) =>
p.children.foreach(child => checkPlan(child, aggregated,
canContainOuter))
+ case p @ (_ : Union) =>
+ // Set operations (e.g. UNION) containing correlated values are only
supported
+ // with DecorrelateInnerQuery framework.
+ val childCanContainOuter = (canContainOuter
+ && usingDecorrelateInnerQueryFramework
+ && SQLConf.get.getConf(SQLConf.DECORRELATE_SET_OPS_ENABLED))
Review Comment:
Thanks for adding this conf!
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -938,6 +938,74 @@ class SubquerySuite extends QueryTest
}
}
+ test("SPARK-36124: Correlated subqueries with union") {
+ withTempView("t0", "t1", "t2") {
+ Seq((1, 1), (2, 0)).toDF("t0a", "t0b").createOrReplaceTempView("t0")
+ Seq((1, 1, 3)).toDF("t1a", "t1b", "t1c").createOrReplaceTempView("t1")
+ Seq((1, 1, 5), (2, 2, 7)).toDF("t2a", "t2b",
"t2c").createOrReplaceTempView("t2")
+
+ // Union with different outer refs
+ val query =
+ """
+ | SELECT t0a, (SELECT sum(t1c) FROM
+ | (SELECT t1c
+ | FROM t1
+ | WHERE t1a = t0a
+ | UNION ALL
+ | SELECT t2c
+ | FROM t2
+ | WHERE t2b = t0b)
+ | )
+ | FROM t0""".stripMargin
+
+ {
+ val df = sql(query)
+ checkAnswer(df,
+ Row(1, 8) :: Row(2, null) :: Nil)
+
+ val optimizedPlan = df.queryExecution.optimizedPlan
+ val aggregate = optimizedPlan.collectFirst { case a: Aggregate => a
}.get
+ assert(aggregate.groupingExpressions.size == 2)
+ val union = optimizedPlan.collectFirst { case u: Union => u }.get
+ assert(union.output.size == 3)
+ assert(optimizedPlan.resolved)
+ }
+ withSQLConf(SQLConf.DECORRELATE_INNER_QUERY_ENABLED.key -> "false") {
+ val error = intercept[AnalysisException] { sql(query) }
+ assert(error.message.contains("Accessing outer query column is not
allowed"))
Review Comment:
Can we check the error class instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]