dtenedor commented on code in PR #37840:
URL: https://github.com/apache/spark/pull/37840#discussion_r967712146
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -1983,9 +1992,13 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
withTempView("t1", "t2") {
Seq((0, 1)).toDF("c1", "c2").createOrReplaceTempView("t1")
Seq((1, 2), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
- assert(intercept[AnalysisException] {
+ val msg1 = intercept[AnalysisException] {
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -923,7 +966,11 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
// Make sure a plan's subtree does not contain outer references
def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
if (hasOuterReferences(p)) {
- failAnalysis(s"Accessing outer query column is not allowed in:\n$p")
+ throw new AnalysisException(
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES",
+ origin = p.origin,
+ messageParameters = Array.empty[String])
Review Comment:
I reviewed this with Gengliang and Serge. We can have a SQLConf that toggles
whether the plan toStrings are included in the error messages or not. This
leaves the option available for debugging purposes if needed.
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,83 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate"
+ ]
+ },
+
"CORRELATED_SCALAR_SUBQUERIES_IN_GROUP_BY_MUST_BE_IN_AGGREGATE_EXPRESSIONS" : {
Review Comment:
Sounds good, replaced.
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -523,30 +523,39 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
val errMsg = intercept[AnalysisException] {
sql("select (select sum(-1) from t t2 where t1.c2 = t2.c1 group by
t2.c2) sum from t t1")
}
- assert(errMsg.getMessage.contains(
- "A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns:"))
+ checkError(
+ errMsg,
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = Some("NON_CORRELATED_COLUMNS_IN_GROUP_BY"),
+ parameters = Map("value" -> "c2"))
}
}
test("non-aggregated correlated scalar subquery") {
val msg1 = intercept[AnalysisException] {
Review Comment:
Done
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
+ ]
+ },
+ "CORRELATED_SCALAR_SUBQUERIES_ONLY_IN_CERTAIN_PLACES" : {
+ "message" : [
+ "Correlated scalar subqueries can only be used in filters,
aggregations, projections, and UPDATE/MERGE/DELETE commands<plan>"
+ ]
+ },
+ "IN_EXISTS_SUBQUERIES_ONLY_CERTAIN_PLACES" : {
+ "message" : [
+ "IN/EXISTS predicate subqueries can only be used in filters, joins,
aggregations, window functions, projections, and UPDATE/MERGE/DELETE
commands<plan>"
+ ]
+ },
+ "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC" : {
+ "message" : [
+ "Lateral join condition cannot be non-deterministic: <condition>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY" : {
+ "message" : [
+ "Correlated scalar subqueries in the GROUP BY clause must also be in
the aggregate expressions<plan>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT" : {
+ "message" : [
+ "The output of a correlated scalar subquery must be aggregated"
+ ]
+ },
+ "NON_CORRELATED_COLUMNS_IN_GROUP_BY" : {
+ "message" : [
+ "A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns: <value>"
+ ]
+ },
+ "NON_DETERMINISTIC_LATERAL_SUBQUERIES" : {
+ "message" : [
+ "Non-deterministic lateral subqueries are not supported when joining
with outer relations that produce more than one row<plan>"
+ ]
+ },
+ "OUTER_ATTRIBUTE_NOT_FOUND" : {
+ "message" : [
+ "Outer attribute not found: <value>"
+ ]
+ },
+ "OUTER_REFERENCES_ONLY_ALLOWED_IN_WHERE_HAVING" : {
+ "message" : [
+ "Expressions referencing the outer query are not supported outside
of WHERE/HAVING clauses<plan>"
+ ]
+ },
+ "SCALAR_SUBQUERY_RETURN_MORE_THAN_ONE_OUTPUT_COLUMN" : {
Review Comment:
👍 I renamed the latter error class to
`UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY` and now only this one remains in
`INVALID_SUBQUERY_EXPRESSION`.
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
+ ]
+ },
+ "CORRELATED_SCALAR_SUBQUERIES_ONLY_IN_CERTAIN_PLACES" : {
+ "message" : [
+ "Correlated scalar subqueries can only be used in filters,
aggregations, projections, and UPDATE/MERGE/DELETE commands<plan>"
+ ]
+ },
+ "IN_EXISTS_SUBQUERIES_ONLY_CERTAIN_PLACES" : {
+ "message" : [
+ "IN/EXISTS predicate subqueries can only be used in filters, joins,
aggregations, window functions, projections, and UPDATE/MERGE/DELETE
commands<plan>"
+ ]
+ },
+ "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC" : {
+ "message" : [
+ "Lateral join condition cannot be non-deterministic: <condition>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY" : {
+ "message" : [
+ "Correlated scalar subqueries in the GROUP BY clause must also be in
the aggregate expressions<plan>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT" : {
+ "message" : [
+ "The output of a correlated scalar subquery must be aggregated"
+ ]
+ },
+ "NON_CORRELATED_COLUMNS_IN_GROUP_BY" : {
+ "message" : [
+ "A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns: <value>"
+ ]
+ },
+ "NON_DETERMINISTIC_LATERAL_SUBQUERIES" : {
+ "message" : [
+ "Non-deterministic lateral subqueries are not supported when joining
with outer relations that produce more than one row<plan>"
+ ]
+ },
+ "OUTER_ATTRIBUTE_NOT_FOUND" : {
Review Comment:
It's an internal error. I changed this to `CORRELATED_COLUMN_NOT_FOUND` and
updated the message to `A correlated outer name reference within a subquery
expression body was not found in the enclosing query: <value>`. Hopefully this
is better.
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -523,30 +523,39 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
val errMsg = intercept[AnalysisException] {
sql("select (select sum(-1) from t t2 where t1.c2 = t2.c1 group by
t2.c2) sum from t t1")
}
- assert(errMsg.getMessage.contains(
- "A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns:"))
+ checkError(
+ errMsg,
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = Some("NON_CORRELATED_COLUMNS_IN_GROUP_BY"),
+ parameters = Map("value" -> "c2"))
}
}
test("non-aggregated correlated scalar subquery") {
val msg1 = intercept[AnalysisException] {
sql("select a, (select b from l l2 where l2.a = l1.a) sum_b from l l1")
}
- assert(msg1.getMessage.contains("Correlated scalar subqueries must be
aggregated"))
-
+ checkError(
+ msg1,
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = Some("MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY"))
val msg2 = intercept[AnalysisException] {
sql("select a, (select b from l l2 where l2.a = l1.a group by 1) sum_b
from l l1")
}
- assert(msg2.getMessage.contains(
- "The output of a correlated scalar subquery must be aggregated"))
+ checkError(
+ msg2,
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = Some("MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT"))
}
test("non-equal correlated scalar subquery") {
val msg1 = intercept[AnalysisException] {
Review Comment:
Done
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -2005,13 +2018,17 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
|FROM (SELECT CAST(c1 AS STRING) a FROM t1)
|""".stripMargin),
Row(5) :: Row(null) :: Nil)
- assert(intercept[AnalysisException] {
+ val msg1 = intercept[AnalysisException] {
Review Comment:
Thanks, updated.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -743,7 +743,11 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
case a: AggregateExpression => a
})
if (aggregates.isEmpty) {
- failAnalysis("The output of a correlated scalar subquery must be
aggregated")
+ throw new AnalysisException(
Review Comment:
I originally tried this, then after speaking with Gengliang, found that we
need to call `failAnalysis` as a method of the expression or operator of
interest in order to propagate the parse location information. I switched to
doing that instead; as a bonus, there are fewer diffs in `CheckAnalysis` as a
result of this PR since these were all calls to `failAnalysis` before, just a
different overload.
##########
sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala:
##########
@@ -523,30 +523,39 @@ class SubquerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
val errMsg = intercept[AnalysisException] {
Review Comment:
Done
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
+ ]
+ },
+ "CORRELATED_SCALAR_SUBQUERIES_ONLY_IN_CERTAIN_PLACES" : {
Review Comment:
Sounds good, done.
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
Review Comment:
Good points. I spoke with Gengliang: with his
https://github.com/apache/spark/pull/37841, the `origin` which is provided to
all these errors includes the query context now. And speaking with Allison and
Xiao, they mention that we would like to keep these plan strings the way they
are for internal debugging purposes [1]. This `<plan>` string and all the
others in this PR are covered by the new SQLConf `spark.sql.error.includePlans`
now. If we want to turn that off by default for DBR later, we can, and the
option will still be available to ourselves for debugging later if we want it.
[1]
https://databricks.slack.com/archives/G9S3V8SE5/p1662958012512619?thread_ts=1662745162.116989&cid=G9S3V8SE5
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
+ ]
+ },
+ "CORRELATED_SCALAR_SUBQUERIES_ONLY_IN_CERTAIN_PLACES" : {
+ "message" : [
+ "Correlated scalar subqueries can only be used in filters,
aggregations, projections, and UPDATE/MERGE/DELETE commands<plan>"
+ ]
+ },
+ "IN_EXISTS_SUBQUERIES_ONLY_CERTAIN_PLACES" : {
+ "message" : [
+ "IN/EXISTS predicate subqueries can only be used in filters, joins,
aggregations, window functions, projections, and UPDATE/MERGE/DELETE
commands<plan>"
+ ]
+ },
+ "LATERAL_JOIN_CONDITION_NON_DETERMINISTIC" : {
+ "message" : [
+ "Lateral join condition cannot be non-deterministic: <condition>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY" : {
+ "message" : [
+ "Correlated scalar subqueries in the GROUP BY clause must also be in
the aggregate expressions<plan>"
+ ]
+ },
+ "MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY_OUTPUT" : {
+ "message" : [
+ "The output of a correlated scalar subquery must be aggregated"
+ ]
+ },
+ "NON_CORRELATED_COLUMNS_IN_GROUP_BY" : {
+ "message" : [
+ "A GROUP BY clause in a scalar correlated subquery cannot contain
non-correlated columns: <value>"
+ ]
+ },
+ "NON_DETERMINISTIC_LATERAL_SUBQUERIES" : {
+ "message" : [
+ "Non-deterministic lateral subqueries are not supported when joining
with outer relations that produce more than one row<plan>"
+ ]
+ },
+ "OUTER_ATTRIBUTE_NOT_FOUND" : {
+ "message" : [
+ "Outer attribute not found: <value>"
+ ]
+ },
+ "OUTER_REFERENCES_ONLY_ALLOWED_IN_WHERE_HAVING" : {
Review Comment:
👍 this is better
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
+ ]
+ },
+ "CORRELATED_SCALAR_SUBQUERIES_ONLY_IN_CERTAIN_PLACES" : {
+ "message" : [
+ "Correlated scalar subqueries can only be used in filters,
aggregations, projections, and UPDATE/MERGE/DELETE commands<plan>"
+ ]
+ },
+ "IN_EXISTS_SUBQUERIES_ONLY_CERTAIN_PLACES" : {
Review Comment:
👍
##########
core/src/main/resources/error/error-classes.json:
##########
@@ -327,6 +327,78 @@
],
"sqlState" : "42000"
},
+ "INVALID_SUBQUERY_EXPRESSION" : {
+ "message" : [
+ "Invalid subquery:"
+ ],
+ "subClass" : {
+ "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED" : {
+ "message" : [
+ "Accessing outer query column is not allowed in this location<plan>"
+ ]
+ },
+ "AGGREGATE_FUNCTION_MIXED_OUTER_LOCAL_REFERENCES" : {
+ "message" : [
+ "Found an aggregate function in a correlated predicate that has both
outer and local references, which is not supported: <function>"
+ ]
+ },
+ "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE" : {
+ "message" : [
+ "Correlated column is not allowed in predicate<plan>"
Review Comment:
Same as above, all these errors include the origin/context now, and this
string is toggled by the new `spark.sql.error.includePlan` SQLConf which we can
turn off later by default if we want (and the option would still be available
for internal debugging purposes if we want to use it later).
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -756,10 +756,8 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
- failAnalysis(
- "A GROUP BY clause in a scalar correlated subquery " +
- "cannot contain non-correlated columns: " +
- invalidCols.mkString(","))
+ throw QueryCompilationErrors.nonCorrelatedColumnsInGroupBy(
+ invalidCols.map(_.name).mkString(","), expr.origin)
Review Comment:
👍 we spoke, we need to call `failAnalysis` as a method on the expression or
operator of interest, and then his recent PR will handle propagating the query
context out of the `origin` and to where it needs to go.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##########
@@ -1543,10 +1543,108 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
new AnalysisException(s"'$operation' does not support partitioning")
}
- def mixedRefsInAggFunc(funcStr: String): Throwable = {
- val msg = "Found an aggregate function in a correlated predicate that has
both " +
- "outer and local references, which is not supported: " + funcStr
- new AnalysisException(msg)
+ def accessingOuterQueryColumnIsNotAllowed(plan: String, origin: Origin):
Throwable = {
+ new AnalysisException(
+ errorClass = "INVALID_SUBQUERY_EXPRESSION",
+ errorSubClass = "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+ origin = origin,
+ messageParameters = Array(if (SQLConf.get.includePlansInErrors) s":
$plan" else ""))
Review Comment:
Agree in general. In this particular API the message parameters are passed
as arrays through many helper functions, and there is lots of other usage in
this file like this already. Would prefer to move it all over to using Maps in
a separate refactoring-only PR instead since the change would be large, and we
can separate the refactoring from any behavior change.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala:
##########
@@ -756,10 +756,8 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
val invalidCols = groupByCols -- correlatedCols
// GROUP BY columns must be a subset of columns in the predicates
if (invalidCols.nonEmpty) {
- failAnalysis(
- "A GROUP BY clause in a scalar correlated subquery " +
- "cannot contain non-correlated columns: " +
- invalidCols.mkString(","))
+ throw QueryCompilationErrors.nonCorrelatedColumnsInGroupBy(
+ invalidCols.map(_.name).mkString(","), expr.origin)
Review Comment:
Sure, this is done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]