GitHub user tejasapatil opened a pull request:

    https://github.com/apache/spark/pull/19001

    [SPARK-19256][SQL] Hive bucketing support

    ## What changes were proposed in this pull request?
    
    This PR implements both read and write side changes for supporting hive 
bucketing in Spark. I had initially created a PR for just the write side 
changes (https://github.com/apache/spark/pull/18954) for simplicity. If 
reviewers want to review reader and writer side changes separately, I am happy 
to wait for the writer side PR to get merged and then send a new PR for reader 
side changes.
    
    ### Semantics for read:
    - `outputPartitioning` while scanning hive table would be the set of 
bucketing columns (whether its partitioned or not, whether you are reading 
single partition or multiple partitions)
    - `outputOrdering` would be the sort columns (actually prefix subset of 
`sort columns` being read from the table). 
    - In case of reading multiple hive partitions of the table, there would be 
multiple files per bucket so global sorting across buckets is not there. Thus 
we would have to ignore the sort information.
    - See the documentation in `HiveTableScanExec` where the 
`outputPartitioning` and `outputOrdering` is populated for more nitty gritty 
details.
    
    ### Semantics for write:
    - If the Hive table is bucketed, then INSERT node expect the child 
distribution to be based on the hash of the bucket columns. Else it would be 
empty. (Just to compare with Spark native bucketing : the required distribution 
is not enforced even if the table is bucketed or not... this saves the shuffle 
in comparison with hive).
    - Sort ordering for INSERT node over Hive bucketed table is determined as 
follows:
    
    | Table type   | Normal table | Bucketed table |
    | ------------- | ------------- | ------------- |
    | non-partitioned insert  | Nil | sort columns |
    | static partition   | Nil | sort columns |
    | dynamic partitions   | partition columns | (partition columns + bucketId 
+ sort columns) |
    
    Just to compare how sort ordering is expressed for Spark native bucketing:
    
    | Table type   | Normal table | Bucketed table |
    | ------------- | ------------- | ------------- |
    |  sort ordering | partition columns | (partition columns + bucketId + sort 
columns) |
    
    Why is there a difference ? With hive, since there bucketed insertions 
would need a shuffle, sort ordering can be relaxed for both non-partitioned and 
static partition cases. Every RDD partition would get rows corresponding to a 
single bucket so those can be written to corresponding output file after sort. 
In case of dynamic partitions, the rows need to be routed to appropriate 
partition which makes it similar to Spark's constraints.
    
    - Only `Overwrite` mode is allowed for hive bucketed tables as any other 
mode will break the bucketing guarantees of the table. This is a difference wrt 
how Spark bucketing works.
    - With the PR, if there are no files created for empty buckets, the query 
will fail. Will support creation of empty files in coming iteration. This is a 
difference wrt how Spark bucketing works as it does NOT need files for empty 
buckets.
    
    ### Summary of changes done:
    - `ClusteredDistribution` and `HashPartitioning` are modified to store the 
hashing function used.
    - `RunnableCommand`'s' can now express the required distribution and 
ordering. This is used by `ExecutedCommandExec` which run these commands
      - The good thing about this is that I could remove the logic for 
enforcing sort ordering inside `FileFormatWriter` which felt out of place. 
Ideally, this kinda adding of physical nodes should be done within the planner 
which is what happens with this PR.
    - `InsertIntoHiveTable` enforces both distribution and sort ordering
    - `InsertIntoHadoopFsRelationCommand` enforces sort ordering ONLY (and not 
the distribution)
    - Fixed a bug due to which any alter commands to bucketed table (eg. 
updating stats) would wipe out the bucketing spec from metastore. This made 
insertions to bucketed table non-idempotent operation.
    - `HiveTableScanExec` populates `outputPartitioning` and `outputOrdering` 
based on table's metadata, configs and the query
    - `HadoopTableReader` to use `BucketizedSparkInputFormat` for bucketed reads
    
    ## How was this patch tested?
    
    - Added new unit tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tejasapatil/spark bucket_read

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19001.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19001
    
----
commit a4a7ac941f9d53a5aeff1e9b9a19dbf611e54ac2
Author: Tejas Patil <[email protected]>
Date:   2017-08-03T22:57:54Z

    bucketed writer implementation

commit 5aee02b1754eaf7535bcd121eee4d31cb61e65d5
Author: Tejas Patil <[email protected]>
Date:   2017-08-15T23:27:06Z

    Move `requiredOrdering` into RunnableCommand instead of `FileFormatWriter`

commit 9b8f0842eb5b61e6ae1a9fc76aebe9ff88c2a39b
Author: Tejas Patil <[email protected]>
Date:   2017-08-16T23:54:48Z

    print only the files names in error message instead of entire FileStatus 
object

commit 02d87119f60db4db3e141b2f72365b09b45d9647
Author: Tejas Patil <[email protected]>
Date:   2017-08-16T18:33:13Z

    Reader side changes for hive bucketing

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to