GitHub user tejasapatil opened a pull request:
https://github.com/apache/spark/pull/14864
[SPARK-15453] [SQL] FileSourceScanExec to extract `outputOrdering`
information
## What changes were proposed in this pull request?
Extracting sort ordering information in `FileSourceScanExec` so that
planner can make use of it. My motivation to make this change was to get Sort
Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are
bucketed + sorted.
Query:
```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j",
"k").coalesce(1)
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND
a.k=b.k").explain(true)
```
Before:
```
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
: +- *Project [i#119, j#120, k#121]
: +- *Filter (isnotnull(k#121) && isnotnull(j#120))
: +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
+- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format:
ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
```
After: (note that the `Sort` step is no longer there)
```
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
: +- *Filter (isnotnull(k#50) && isnotnull(j#49))
: +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
+- *Filter (isnotnull(j#52) && isnotnull(k#53))
+- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9,
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema:
struct<i:int,j:int,k:string>
```
## How was this patch tested?
Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tejasapatil/spark SPARK-15453_smb_optimization
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14864.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14864
----
commit 07196a8acbf6f0a68f29f96d1eeea74f53bbeb8a
Author: Tejas Patil <[email protected]>
Date: 2016-08-26T07:00:35Z
[SPARK-15453] [SQL] Sort Merge Join to use bucketing metadata to optimize
query plan
BEFORE
```
val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j",
"k").coalesce(1)
hc.sql("DROP TABLE table8").collect
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8,
"j", "k").sortBy("j", "k").saveAsTable("table8")
hc.sql("DROP TABLE table9").collect
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8,
"j", "k").sortBy("j", "k").saveAsTable("table9")
hc.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND
a.k=b.k").explain(true)
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
:- 'UnresolvedRelation table8, a
+- 'UnresolvedRelation table9, b
== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#119, j#120, k#121, i#122, j#123, k#124]
+- Join Inner, ((j#120 = j#123) && (k#121 = k#124))
:- SubqueryAlias a
: +- SubqueryAlias table8
: +- Relation[i#119,j#120,k#121] orc
+- SubqueryAlias b
+- SubqueryAlias table9
+- Relation[i#122,j#123,k#124] orc
== Optimized Logical Plan ==
Join Inner, ((j#120 = j#123) && (k#121 = k#124))
:- Filter (isnotnull(k#121) && isnotnull(j#120))
: +- Relation[i#119,j#120,k#121] orc
+- Filter (isnotnull(k#124) && isnotnull(j#123))
+- Relation[i#122,j#123,k#124] orc
== Physical Plan ==
*SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
:- *Sort [j#120 ASC, k#121 ASC], false, 0
: +- *Project [i#119, j#120, k#121]
: +- *Filter (isnotnull(k#121) && isnotnull(j#120))
: +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
+- *Sort [j#123 ASC, k#124 ASC], false, 0
+- *Project [i#122, j#123, k#124]
+- *Filter (isnotnull(k#124) && isnotnull(j#123))
+- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format:
ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
```
AFTER
```
== Parsed Logical Plan ==
'Project [*]
+- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
:- 'UnresolvedRelation `table8`, a
+- 'UnresolvedRelation `table9`, b
== Analyzed Logical Plan ==
i: int, j: int, k: string, i: int, j: int, k: string
Project [i#48, j#49, k#50, i#51, j#52, k#53]
+- Join Inner, ((j#49 = j#52) && (k#50 = k#53))
:- SubqueryAlias a
: +- SubqueryAlias table8
: +- Relation[i#48,j#49,k#50] orc
+- SubqueryAlias b
+- SubqueryAlias table9
+- Relation[i#51,j#52,k#53] orc
== Optimized Logical Plan ==
Join Inner, ((j#49 = j#52) && (k#50 = k#53))
:- Filter (isnotnull(k#50) && isnotnull(j#49))
: +- Relation[i#48,j#49,k#50] orc
+- Filter (isnotnull(j#52) && isnotnull(k#53))
+- Relation[i#51,j#52,k#53] orc
== Physical Plan ==
*SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
:- *Project [i#48, j#49, k#50]
: +- *Filter (isnotnull(k#50) && isnotnull(j#49))
: +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8,
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>
+- *Project [i#51, j#52, k#53]
+- *Filter (isnotnull(j#52) && isnotnull(k#53))
+- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false,
Format: ORC, InputPaths:
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9,
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema:
struct<i:int,j:int,k:string>
```
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]