GitHub user tejasapatil opened a pull request:
https://github.com/apache/spark/pull/15272
[SPARK-17698] [SQL] Join predicates should not contain filter clauses
## What changes were proposed in this pull request?
Jira : https://issues.apache.org/jira/browse/SPARK-17698
`ExtractEquiJoinKeys` is incorrectly using filter predicates as the join
condition for joins. `canEvaluate` [0] tries to see if the an `Expression` can
be evaluated using output of a given `Plan`. In case of filter predicates (eg.
`a.id='1'`), the `Expression` passed for the right hand side (ie. '1' ) is a
`Literal` which does not have any attribute references. Thus `expr.references`
is an empty set which theoretically is a subset of any set. This leads to
`canEvaluate` returning `true` and `a.id='1'` is treated as a join predicate.
While this does not lead to incorrect results but in case of bucketed + sorted
tables, we might miss out on avoiding un-necessary shuffle + sort. See example
below:
[0] :
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L91
eg.
```
val df = (1 until 10).toDF("id").coalesce(1)
hc.sql("DROP TABLE IF EXISTS table1").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table1")
hc.sql("DROP TABLE IF EXISTS table2").collect
df.write.bucketBy(8, "id").sortBy("id").saveAsTable("table2")
sqlContext.sql("""
SELECT a.id, b.id
FROM table1 a
FULL OUTER JOIN table2 b
ON a.id = b.id AND a.id='1' AND b.id='1'
""").explain(true)
```
BEFORE: This is doing shuffle + sort over table scan outputs which is not
needed as both tables are bucketed and sorted on the same columns and have same
number of buckets. This should be a single stage job.
```
SortMergeJoin [id#38, cast(id#38 as double), 1.0], [id#39, 1.0, cast(id#39
as double)], FullOuter
:- *Sort [id#38 ASC NULLS FIRST, cast(id#38 as double) ASC NULLS FIRST, 1.0
ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#38, cast(id#38 as double), 1.0, 200)
: +- *FileScan parquet default.table1[id#38] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
+- *Sort [id#39 ASC NULLS FIRST, 1.0 ASC NULLS FIRST, cast(id#39 as double)
ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#39, 1.0, cast(id#39 as double), 200)
+- *FileScan parquet default.table2[id#39] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
```
AFTER :
```
SortMergeJoin [id#32], [id#33], FullOuter, ((cast(id#32 as double) = 1.0)
&& (cast(id#33 as double) = 1.0))
:- *FileScan parquet default.table1[id#32] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table1, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
+- *FileScan parquet default.table2[id#33] Batched: true, Format:
ParquetFormat, InputPaths: file:spark-warehouse/table2, PartitionFilters: [],
PushedFilters: [], ReadSchema: struct<id:int>
```
## How was this patch tested?
- Added a new test case for this scenario : `SPARK-17698 Join predicates
should not contain filter clauses`
- Ran all the tests in `BucketedReadSuite`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tejasapatil/spark
SPARK-17698_join_predicate_filter_clause
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15272.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15272
----
commit b65c9263da58e703785356bc5c3a64d3e3ecbc0e
Author: Tejas Patil <[email protected]>
Date: 2016-09-28T01:24:46Z
[SPARK-17698] [SQL] Join predicates should not contain filter clauses
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]