c21 opened a new pull request #30003:
URL: https://github.com/apache/spark/pull/30003
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline first
for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section
is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster
reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class
hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other
DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
Hive ORC/Parquet write code path is same as data source v1 code path
(FileFormatWriter). This PR is to add the support to write Hive ORC/Parquet
bucketed table with hivehash. The change is to custom `bucketIdExpression` to
use `HiveHash` when the table is Hive bucketed table, and the Hive version is
1.x.y or 2.x.y. Support for Hive 3 will be added later in other PR after Hive
murmur3hash being added in spark.
The changes are mostly on:
* `HiveMetastoreCatalog.scala`: When converting hive table relation to data
source relation, pass bucket info (`BucketSpec`) and other hive related info as
`options` into `HadoopFsRelation` and `LogicalRelation`, which can be later
accessed by `InsertIntoHadoopFsRelationCommand` and `FileFormatWriter`.
* `FileFormatWriter.scala`: Use `HiveHash` for `bucketIdExpression` if it's
writing to hive bucketed table. In addition, spark output file name should
follow Hive (and Presto) bucketed file naming convention. Introduce another
parameter `bucketFileNamePrefix` and it introduces subsequent change in
`FileCommitProtocol` and `HadoopMapReduceCommitProtocol`.
* `DataSourceScanExec.scala`: Add an extra check for `bucketedScan` that
makes sure not enable bucketing when reading hive bucketed table as we
propagate bucket spec from every hive relation (read and write) in
`HiveMetastoreCatalog.scala`.
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
To make spark write other-SQL-engines-compatible bucketed table. Currently
spark bucketed table cannot be leveraged by other SQL engines like hive and
presto, because it uses a different hash function (spark murmur3hash). With
this PR, the spark-written-hive-bucketed-table can be efficiently read by
presto and hive to do bucket filter pruning, join, group-by, etc. This was and
is blocking several companies (confirmed from facebook, uber, etc) migrate
bucketing workload from hive to spark.
### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as
the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes
- provide the console output, description and/or an example to show the
behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to
the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
Yes, any hive bucketed table written by spark with hive 1/2, is properly
bucketed and can be efficiently processed by presto and hive.
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some
test cases that check the changes thoroughly including negative and positive
cases if possible.
If it was tested in a way different from regular unit tests, please clarify
how you tested step by step, ideally copy and paste-able, so that other
reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why
it was difficult to add.
-->
1. Added unit test in `BucketedWriteWithHiveSupportSuite.scala`, to verify
bucket file names and each row in each bucket is written properly.
2. Cross engines test (take prestosql as example here): set up presto server
and hive metastore locally in laptop, and run presto and spark queries locally
in laptop.
Created a hive bucketed table by using presto:
```sql
CREATE TABLE hive.di.chengsu_table (
key int,
value varchar,
part varchar
)
WITH (
format = 'PARQUET',
partitioned_by = ARRAY['part'],
bucketed_by = ARRAY['key'],
bucket_count = 8
)
```
Write hive bucketed table (part='part0') by using spark, and read the table
by using presto. Verify presto bucket pruning work:
```sql
presto:default> SELECT * FROM hive.di.chengsu_table WHERE part='part0' AND
"$bucket" in (2);
key | value | part
-----+-------+-------
2 | 2 | part0
10 | 10 | part0
18 | 18 | part0
(3 rows)
```
Underlying files in the partition directory:
```
chengsu@chengsu-mbp part=part0 % ls
00002_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00002.c000.snappy.parquet
00005_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00005.c000.snappy.parquet
```
Write the hive bucketed table by using presto (NOTE: after this, one
partition contains both presto and spark written files):
```sql
presto:default> INSERT INTO hive.di.chengsu_table
-> VALUES
-> (0, '0', 'part0'),
-> (1, '1', 'part0'),
-> (2, '2', 'part0'),
-> (1, '1', 'part1'),
-> (2, '2', 'part1'),
-> (3, '3', 'part1');
```
Use presto read the partition again, verify the bucket pruning work on mixed
data written by spark and presto:
```sql
presto:default> SELECT * FROM hive.di.chengsu_table WHERE part='part0' AND
"$bucket" in (2);
key | value | part
-----+-------+-------
2 | 2 | part0
10 | 10 | part0
18 | 18 | part0
2 | 2 | part0
```
Underlying files in partition directory:
```
chengsu@chengsu-mbp part=part0 % ls
000000_0_20201010_200332_00028_tct3t
000001_0_20201010_200332_00028_tct3t
000002_0_20201010_200332_00028_tct3t
00002_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00002.c000.snappy.parquet
00005_0_part-00000-ccb3dc48-76b2-41fb-8030-7a6601a64bb3_00005.c000.snappy.parquet
```
In addition, verify join spark-written-bucketed-table and
presto-written-bucketed table and the result looks correct:
```sql
presto:default> SELECT t1.key, t1.value, t2.key, t2.value
-> FROM hive.di.chengsu_table t1
-> JOIN hive.di.chengsu_table t2
-> ON t1.value = t2.value
-> AND t1.part = 'part0'
-> AND t2.part = 'part1';
key | value | key | value
-----+-------+-----+-------
2 | 2 | 2 | 2
2 | 2 | 2 | 2
1 | 1 | 1 | 1
2 | 2 | 2 | 2
2 | 2 | 2 | 2
```
Aggregate on spark-written-bucketed-table and the result looks correct:
```sql
presto:default> SELECT key, COUNT(*)
-> FROM hive.di.chengsu_table
-> WHERE part = 'part0'
-> GROUP BY key;
key | _col1
-----+-------
0 | 1
5 | 1
13 | 1
1 | 1
2 | 2
10 | 1
18 | 1
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]