GitHub user tejasapatil opened a pull request:
https://github.com/apache/spark/pull/15600
[SPARK-17698] [SQL] Join predicates should not contain filter clauses
## What changes were proposed in this pull request?
This is a backport of https://github.com/apache/spark/pull/15272 to 2.0
branch.
Jira : https://issues.apache.org/jira/browse/SPARK-17698
`ExtractEquiJoinKeys` is incorrectly using filter predicates as the join
condition for joins. `canEvaluate` [0] tries to see if the an `Expression` can
be evaluated using output of a given `Plan`. In case of filter predicates (eg.
`a.id='1'`), the `Expression` passed for the right hand side (ie. '1' ) is a
`Literal` which does not have any attribute references. Thus `expr.references`
is an empty set which theoretically is a subset of any set. This leads to
`canEvaluate` returning `true` and `a.id='1'` is treated as a join predicate.
While this does not lead to incorrect results but in case of bucketed + sorted
tables, we might miss out on avoiding un-necessary shuffle + sort. See example
below:
[0] :
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91
eg.
```
val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")
sqlContext.sql("""
SELECT a.id, b.id
FROM table1 a
FULL OUTER JOIN table2 b
ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)
```
BEFORE: This is doing shuffle + sort over table scan outputs which is not
needed as both tables are bucketed and sorted on the same columns and have same
number of buckets. This should be a single stage job.
```
SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39
as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0
ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
: +- *FileScan parquet default.table1[id#38] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double)
ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
+- *FileScan parquet default.table2[id#39] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
```
AFTER :
```
SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0)
&& (cast(id#33 as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
```
## How was this patch tested?
- Added a new test case for this scenario : `SPARK-17698 Join predicates
should not contain filter clauses`
- Ran all the tests in `BucketedReadSuite`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tejasapatil/spark SPARK-17698_2.0_backport
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15600.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15600
----
commit df5083894198e1a85fb17544fc596a3869a9e1b6
Author: Tejas Patil <[email protected]>
Date: 2016-10-22T20:16:40Z
Backport to 2.0 : [SPARK-17698] [SQL] Join predicates should not contain
filter clauses
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]