GitHub user tejasapatil opened a pull request:

    https://github.com/apache/spark/pull/14864

    [SPARK-15453] [SQL] FileSourceScanExec to extract `outputOrdering` 
information

    ## What changes were proposed in this pull request?
    
    Extracting sort ordering information in `FileSourceScanExec` so that 
planner can make use of it. My motivation to make this change was to get Sort 
Merge join in par with Hive's Sort-Merge-Bucket join when the source tables are 
bucketed + sorted.
    
    Query:
    
    ```
    val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", 
"k").coalesce(1)
    df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table8")
    df.write.bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("table9")
    context.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND 
a.k=b.k").explain(true)
    ```
    
    Before:
    
    ```
    == Physical Plan ==
    *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
    :- *Sort [j#120 ASC, k#121 ASC], false, 0
    :  +- *Project [i#119, j#120, k#121]
    :     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
    :        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    +- *Sort [j#123 ASC, k#124 ASC], false, 0
    +- *Project [i#122, j#123, k#124]
    +- *Filter (isnotnull(k#124) && isnotnull(j#123))
     +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: 
ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    ```
    
    After:  (note that the `Sort` step is no longer there)
    
    ```
    == Physical Plan ==
    *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
    :- *Project [i#48, j#49, k#50]
    :  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
    :     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    +- *Project [i#51, j#52, k#53]
       +- *Filter (isnotnull(j#52) && isnotnull(k#53))
          +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, 
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: 
struct<i:int,j:int,k:string>
    ```
    
    ## How was this patch tested?
    
    Added a test case in `JoinSuite`. Ran all other tests in `JoinSuite`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tejasapatil/spark SPARK-15453_smb_optimization

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/14864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #14864
    
----
commit 07196a8acbf6f0a68f29f96d1eeea74f53bbeb8a
Author: Tejas Patil <[email protected]>
Date:   2016-08-26T07:00:35Z

    [SPARK-15453] [SQL] Sort Merge Join to use bucketing metadata to optimize 
query plan
    
    BEFORE
    
    ```
    val df = (0 until 16).map(i => (i % 8, i * 2, i.toString)).toDF("i", "j", 
"k").coalesce(1)
    hc.sql("DROP TABLE table8").collect
    df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"j", "k").sortBy("j", "k").saveAsTable("table8")
    hc.sql("DROP TABLE table9").collect
    df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"j", "k").sortBy("j", "k").saveAsTable("table9")
    
    hc.sql("SELECT * FROM table8 a JOIN table9 b ON a.j=b.j AND 
a.k=b.k").explain(true)
    
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
    :- 'UnresolvedRelation table8, a
    +- 'UnresolvedRelation table9, b
    
    == Analyzed Logical Plan ==
    i: int, j: int, k: string, i: int, j: int, k: string
    Project [i#119, j#120, k#121, i#122, j#123, k#124]
    +- Join Inner, ((j#120 = j#123) && (k#121 = k#124))
    :- SubqueryAlias a
    :  +- SubqueryAlias table8
    :     +- Relation[i#119,j#120,k#121] orc
    +- SubqueryAlias b
      +- SubqueryAlias table9
    +- Relation[i#122,j#123,k#124] orc
    
    == Optimized Logical Plan ==
    Join Inner, ((j#120 = j#123) && (k#121 = k#124))
    :- Filter (isnotnull(k#121) && isnotnull(j#120))
    :  +- Relation[i#119,j#120,k#121] orc
    +- Filter (isnotnull(k#124) && isnotnull(j#123))
    +- Relation[i#122,j#123,k#124] orc
    
    == Physical Plan ==
    *SortMergeJoin [j#120, k#121], [j#123, k#124], Inner
    :- *Sort [j#120 ASC, k#121 ASC], false, 0
    :  +- *Project [i#119, j#120, k#121]
    :     +- *Filter (isnotnull(k#121) && isnotnull(j#120))
    :        +- *FileScan orc default.table8[i#119,j#120,k#121] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    +- *Sort [j#123 ASC, k#124 ASC], false, 0
    +- *Project [i#122, j#123, k#124]
    +- *Filter (isnotnull(k#124) && isnotnull(j#123))
     +- *FileScan orc default.table9[i#122,j#123,k#124] Batched: false, Format: 
ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    ```
    
    AFTER
    
    ```
    == Parsed Logical Plan ==
    'Project [*]
    +- 'Join Inner, (('a.j = 'b.j) && ('a.k = 'b.k))
       :- 'UnresolvedRelation `table8`, a
       +- 'UnresolvedRelation `table9`, b
    
    == Analyzed Logical Plan ==
    i: int, j: int, k: string, i: int, j: int, k: string
    Project [i#48, j#49, k#50, i#51, j#52, k#53]
    +- Join Inner, ((j#49 = j#52) && (k#50 = k#53))
       :- SubqueryAlias a
       :  +- SubqueryAlias table8
       :     +- Relation[i#48,j#49,k#50] orc
       +- SubqueryAlias b
          +- SubqueryAlias table9
             +- Relation[i#51,j#52,k#53] orc
    
    == Optimized Logical Plan ==
    Join Inner, ((j#49 = j#52) && (k#50 = k#53))
    :- Filter (isnotnull(k#50) && isnotnull(j#49))
    :  +- Relation[i#48,j#49,k#50] orc
    +- Filter (isnotnull(j#52) && isnotnull(k#53))
       +- Relation[i#51,j#52,k#53] orc
    
    == Physical Plan ==
    *SortMergeJoin [j#49, k#50], [j#52, k#53], Inner
    :- *Project [i#48, j#49, k#50]
    :  +- *Filter (isnotnull(k#50) && isnotnull(j#49))
    :     +- *FileScan orc default.table8[i#48,j#49,k#50] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table8, 
PartitionFilters: [], PushedFilters: [IsNotNull(k), IsNotNull(j)], ReadSchema: 
struct<i:int,j:int,k:string>
    +- *Project [i#51, j#52, k#53]
       +- *Filter (isnotnull(j#52) && isnotnull(k#53))
          +- *FileScan orc default.table9[i#51,j#52,k#53] Batched: false, 
Format: ORC, InputPaths: 
file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/table9, 
PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(k)], ReadSchema: 
struct<i:int,j:int,k:string>
    ```

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to