GitHub user tejasapatil opened a pull request:
https://github.com/apache/spark/pull/13231
[SPARK-15453] [SQL] SMB Join for datasource
## What changes were proposed in this pull request?
Currently for bucketed and sorted tables, SORT MERGE JOIN doesn't use this
table metadata to avoid the unnecessary operations (eg. Exchange and Sort).
This PR adds that support.
- Populated sort ordering in *DataSourceScanExec for bucketed tables
- Fixed a bug related to comparing ordering in `EnsureRequirements`
```
val df1 = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j",
"k")
df1.write.format("orc").partitionBy("i").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table7")
df1.write.format("orc").partitionBy("i").bucketBy(8, "j", "k").sortBy("j",
"k").saveAsTable("table8")
hc.sql("SELECT * FROM table7 a JOIN table8 b ON a.j=b.j AND a.k=b.k AND
a.i=b.i AND a.i=2 AND b.i=2").explain(true)
```
Before:
```
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, Some((((('a.j = 'b.j) && ('a.k = 'b.k)) && ('a.i = 'b.i))
&& (('a.i = 2) && ('b.i = 2))))
:- 'UnresolvedRelation `table7`, Some(a)
+- 'UnresolvedRelation `table8`, Some(b)
== Analyzed Logical Plan ==
j: int, k: string, i: int, j: int, k: string, i: int
Project [j#20,k#21,i#22,j#23,k#24,i#25]
+- Join Inner, Some(((((j#20 = j#23) && (k#21 = k#24)) && (i#22 = i#25)) &&
((i#22 = 2) && (i#25 = 2))))
:- SubqueryAlias a
: +- SubqueryAlias table7
: +- Relation[j#20,k#21,i#22] orc
+- SubqueryAlias b
+- SubqueryAlias table8
+- Relation[j#23,k#24,i#25] orc
== Optimized Logical Plan ==
Join Inner, Some((((j#20 = j#23) && (k#21 = k#24)) && (i#22 = i#25)))
:- Filter (((isnotnull(k#21) && isnotnull(j#20)) && isnotnull(i#22)) &&
(i#22 = 2))
: +- Relation[j#20,k#21,i#22] orc
+- Filter (((isnotnull(k#24) && isnotnull(j#23)) && isnotnull(i#25)) &&
(i#25 = 2))
+- Relation[j#23,k#24,i#25] orc
== Physical Plan ==
WholeStageCodegen
: +- SortMergeJoin [j#20,k#21,i#22], [j#23,k#24,i#25], Inner, None
: :- INPUT
: +- INPUT
:- WholeStageCodegen
: : +- Sort [j#20 ASC,k#21 ASC,i#22 ASC], false, 0
: : +- INPUT
: +- Exchange hashpartitioning(j#20, k#21, i#22, 200), None
: +- WholeStageCodegen
: : +- Project [j#20,k#21,i#22]
: : +- Filter (isnotnull(k#21) && isnotnull(j#20))
: : +- Scan orc default.table7[j#20,k#21,i#22] Format: ORC,
InputPaths: file:/XXXX/table7, PushedFilters: [IsNotNull(k), IsNotNull(j)],
ReadSchema: struct<j:int,k:string>
+- WholeStageCodegen
: +- Sort [j#23 ASC,k#24 ASC,i#25 ASC], false, 0
: +- INPUT
+- Exchange hashpartitioning(j#23, k#24, i#25, 200), None
+- WholeStageCodegen
: +- Project [j#23,k#24,i#25]
: +- Filter (isnotnull(k#24) && isnotnull(j#23))
: +- Scan orc default.table8[j#23,k#24,i#25] Format: ORC,
InputPaths: file:/XXXX/table8, PushedFilters: [IsNotNull(k), IsNotNull(j)],
ReadSchema: struct<j:int,k:string>
```
After
```
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, Some((((('a.j = 'b.j) && ('a.k = 'b.k)) && ('a.i = 'b.i))
&& (('a.i = 2) && ('b.i = 2))))
:- 'UnresolvedRelation `table7`, Some(a)
+- 'UnresolvedRelation `table8`, Some(b)
== Analyzed Logical Plan ==
j: int, k: string, i: int, j: int, k: string, i: int
Project [j#139,k#140,i#141,j#142,k#143,i#144]
+- Join Inner, Some(((((j#139 = j#142) && (k#140 = k#143)) && (i#141 =
i#144)) && ((i#141 = 2) && (i#144 = 2))))
:- SubqueryAlias a
: +- SubqueryAlias table7
: +- Relation[j#139,k#140,i#141] orc
+- SubqueryAlias b
+- SubqueryAlias table8
+- Relation[j#142,k#143,i#144] orc
== Optimized Logical Plan ==
Join Inner, Some((((j#139 = j#142) && (k#140 = k#143)) && (i#141 = i#144)))
:- Filter (((isnotnull(k#140) && isnotnull(j#139)) && isnotnull(i#141)) &&
(i#141 = 2))
: +- Relation[j#139,k#140,i#141] orc
+- Filter (((isnotnull(k#143) && isnotnull(j#142)) && isnotnull(i#144)) &&
(i#144 = 2))
+- Relation[j#142,k#143,i#144] orc
== Physical Plan ==
WholeStageCodegen
: +- SortMergeJoin [j#139,k#140,i#141], [j#142,k#143,i#144], Inner, None
: :- INPUT
: +- INPUT
:- WholeStageCodegen
: : +- Project [j#139,k#140,i#141]
: : +- Filter (isnotnull(k#140) && isnotnull(j#139))
: : +- Scan orc default.table7[j#139,k#140,i#141] Format: ORC,
InputPaths: file:/XXXX/table7, PushedFilters: [IsNotNull(k), IsNotNull(j)],
ReadSchema: struct<j:int,k:string>
+- WholeStageCodegen
: +- Project [j#142,k#143,i#144]
: +- Filter (isnotnull(k#143) && isnotnull(j#142))
: +- Scan orc default.table8[j#142,k#143,i#144] Format: ORC,
InputPaths: file:/XXXX/table8, PushedFilters: [IsNotNull(k), IsNotNull(j)],
ReadSchema: struct<j:int,k:string>
```
## How was this patch tested?
I have tested for correctness with small data. In process of writing a test
case.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tejasapatil/spark smb_join
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/13231.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #13231
----
commit 9e2bb18d4c23b94bd7f18737a1eb0330f706e3cd
Author: Tejas Patil <[email protected]>
Date: 2016-05-20T19:27:08Z
SMB Join for datasource
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]