maropu commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r615479930
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -899,14 +899,72 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
// +- SubqueryAlias t1, `t1`
// +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
// +- LocalRelation [_1#73, _2#74]
- def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan):
Unit = {
- if (found) {
+ // SPARK-35080: The same issue can happen to correlated equality
predicates when
+ // they do not guarantee one-to-one mapping between inner and outer
attributes.
+ // For example:
+ // Table:
+ // t1(a, b): [(0, 6), (1, 5), (2, 4)]
+ // t2(c): [(6)]
+ //
+ // Query:
+ // SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+ //
+ // Original subquery plan:
+ // Aggregate [count(1)]
+ // +- Filter ((a + b) = outer(c))
+ // +- LocalRelation [a, b]
+ //
+ // Plan after pulling up correlated predicates:
+ // Aggregate [a, b] [count(1), a, b]
+ // +- LocalRelation [a, b]
+ //
+ // Plan after rewrite:
+ // Project [c1, count(1)]
+ // +- Join LeftOuter ((a + b) = c)
+ // :- LocalRelation [c]
+ // +- Aggregate [a, b] [count(1), a, b]
+ // +- LocalRelation [a, b]
+ //
+ // The right hand side of the join transformed from the subquery will
output
+ // count(1) | a | b
+ // 1 | 0 | 6
+ // 1 | 1 | 5
+ // 1 | 2 | 4
+ // and the plan after rewrite will give the original query incorrect
results.
+ def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p:
LogicalPlan): Unit = {
+ if (predicates.nonEmpty) {
// Report a non-supported case as an exception
- failAnalysis(s"Correlated column is not allowed in a non-equality
predicate:\n$p")
+ failAnalysis(s"Correlated column is not allowed in predicate " +
Review comment:
nit: drop `s`
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -899,14 +899,72 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog {
// +- SubqueryAlias t1, `t1`
// +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
// +- LocalRelation [_1#73, _2#74]
- def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan):
Unit = {
- if (found) {
+ // SPARK-35080: The same issue can happen to correlated equality
predicates when
+ // they do not guarantee one-to-one mapping between inner and outer
attributes.
+ // For example:
+ // Table:
+ // t1(a, b): [(0, 6), (1, 5), (2, 4)]
+ // t2(c): [(6)]
+ //
+ // Query:
+ // SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+ //
+ // Original subquery plan:
+ // Aggregate [count(1)]
+ // +- Filter ((a + b) = outer(c))
+ // +- LocalRelation [a, b]
+ //
+ // Plan after pulling up correlated predicates:
+ // Aggregate [a, b] [count(1), a, b]
+ // +- LocalRelation [a, b]
+ //
+ // Plan after rewrite:
+ // Project [c1, count(1)]
+ // +- Join LeftOuter ((a + b) = c)
+ // :- LocalRelation [c]
+ // +- Aggregate [a, b] [count(1), a, b]
+ // +- LocalRelation [a, b]
+ //
+ // The right hand side of the join transformed from the subquery will
output
+ // count(1) | a | b
+ // 1 | 0 | 6
+ // 1 | 1 | 5
+ // 1 | 2 | 4
+ // and the plan after rewrite will give the original query incorrect
results.
+ def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p:
LogicalPlan): Unit = {
+ if (predicates.nonEmpty) {
// Report a non-supported case as an exception
- failAnalysis(s"Correlated column is not allowed in a non-equality
predicate:\n$p")
+ failAnalysis(s"Correlated column is not allowed in predicate " +
+ s"${predicates.map(_.sql).mkString}:\n$p")
}
}
- var foundNonEqualCorrelatedPred: Boolean = false
+ def containsAttribute(e: Expression): Boolean = {
+ e.find(_.isInstanceOf[Attribute]).isDefined
+ }
+
+ // Given a correlated predicate, check if it is either a non-equality
predicate or
+ // equality predicate that does not guarantee one-on-one mapping between
inner and
+ // outer attributes. When the correlated predicate does not contain any
attribute
+ // (i.e. only has outer references), it is supported and should return
false. E.G.:
+ // (a = outer(c)) -> false
+ // (outer(c) = outer(d)) -> false
+ // (a > outer(c)) -> true
+ // (a + b = outer(c)) -> true
+ // The last one is true because there can be multiple combinations of (a,
b) that
+ // satisfy the equality condition. For example, if outer(c) = 0, then both
(0, 0)
+ // and (-1, 1) can make the predicate evaluate to true.
+ def isUnsupportedPredicate(condition: Expression): Boolean = condition
match {
+ // Only allow equality condition with one side being an attribute and
another
+ // side being an expression without attributes from the inner query. Note
+ // OuterReference is a leaf node and will not be found here.
+ case Equality(_: Attribute, b) => containsAttribute(b)
+ case Equality(a, _: Attribute) => containsAttribute(a)
Review comment:
We need the two entries above? It seems the entry in L963 can cover
them, too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]