GitHub user ioana-delaney opened a pull request:
https://github.com/apache/spark/pull/13570
[SPARK-15832][SQL] Embedded IN/EXISTS predicate subquery throws
TreeNodeException
## What changes were proposed in this pull request?
Queries with embedded existential sub-query predicates throws exception
when building the physical plan.
Example failing query:
```SQL
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t1")
scala> Seq((1, 1), (2, 2)).toDF("c1", "c2").createOrReplaceTempView("t2")
scala> sql("select c1 from t1 where (case when c2 in (select c2 from t2)
then 2 else 3 end) IN (select c2 from t1)").show()
Binding attribute, tree: c2#239
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: c2#239
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:50)
at
org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
...
at
org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at
org.apache.spark.sql.execution.joins.HashJoin$$anonfun$4.apply(HashJoin.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.sql.execution.joins.HashJoin$class.org$apache$spark$sql$execution$joins$HashJoin$$x$8(HashJoin.scala:66)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8$lzycompute(BroadcastHashJoinExec.scala:38)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.org$apache$spark$sql$execution$joins$HashJoin$$x$8(BroadcastHashJoinExec.scala:38)
at
org.apache.spark.sql.execution.joins.HashJoin$class.buildKeys(HashJoin.scala:63)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys$lzycompute(BroadcastHashJoinExec.scala:38)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.buildKeys(BroadcastHashJoinExec.scala:38)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.requiredChildDistribution(BroadcastHashJoinExec.scala:52)
```
**Problem description:**
When the left hand side expression of an existential sub-query predicate
contains another embedded sub-query predicate, the RewritePredicateSubquery
optimizer rule does not resolve the embedded sub-query expressions into
existential joins.For example, the above query has the following optimized
plan, which fails during physical plan build.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN predicate-subquery#255 [(_2#225 = c2#239)]
THEN 2 ELSE 3 END = c2#228#262)
: +- SubqueryAlias predicate-subquery#255 [(_2#225 = c2#239)]
: +- LocalRelation [c2#239]
:- LocalRelation [_1#224, _2#225]
+- LocalRelation [c2#228#262]
== Physical Plan ==
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding
attribute, tree: c2#239
```
**Solution:**
In RewritePredicateSubquery, before rewriting the outermost predicate
sub-query, resolve any embedded existential sub-queries. The Optimized plan for
the above query after the changes looks like below.
```SQL
== Optimized Logical Plan ==
Project [_1#224 AS c1#227]
+- Join LeftSemi, (CASE WHEN exists#285 THEN 2 ELSE 3 END = c2#228#284)
:- Join ExistenceJoin(exists#285), (_2#225 = c2#239)
: :- LocalRelation [_1#224, _2#225]
: +- LocalRelation [c2#239]
+- LocalRelation [c2#228#284]
== Physical Plan ==
*Project [_1#224 AS c1#227]
+- *BroadcastHashJoin [CASE WHEN exists#285 THEN 2 ELSE 3 END],
[c2#228#284], LeftSemi, BuildRight
:- *BroadcastHashJoin [_2#225], [c2#239], ExistenceJoin(exists#285),
BuildRight
: :- LocalTableScan [_1#224, _2#225]
: +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, false] as bigint)))
: +- LocalTableScan [c2#239]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
false] as bigint)))
+- LocalTableScan [c2#228#284]
+- LocalTableScan [c222#36], [[111],[222]]
```
## How was this patch tested?
Added new test cases in SubquerySuite.scala
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ioana-delaney/spark fixEmbedSubPredV1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/13570.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #13570
----
commit eea703aa673aab5f56d6a97ad86860422cd563a3
Author: Ioana Delaney <[email protected]>
Date: 2016-06-08T22:49:14Z
[SPARK-15832] Embedded IN/EXISTS predicate subquery throws
TreeNodeException.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]