c21 opened a new pull request #29625:
URL: https://github.com/apache/spark/pull/29625
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://spark.apache.org/contributing.html
2. Ensure you have added or run the appropriate tests for your PR:
https://spark.apache.org/developer-tools.html
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][SPARK-XXXX] Your PR title ...'.
4. Be sure to keep the PR description updated to reflect all changes.
5. Please write your PR title to summarize what this PR proposes.
6. If possible, provide a concise example to reproduce the issue for a
faster review.
7. If you want to add a new configuration, please read the guideline first
for naming configurations in
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
-->
### What changes were proposed in this pull request?
<!--
Please clarify what changes you are proposing. The purpose of this section
is to outline the changes and how this PR fixes the issue.
If possible, please consider writing useful notes for better and faster
reviews in your PR. See the examples below.
1. If you refactor some codes with changing classes, showing the class
hierarchy will help reviewers.
2. If you fix some SQL features, you can provide some references of other
DBMSes.
3. If there is design documentation, please add the link.
4. If there is a discussion in the mailing list, please add the link.
-->
This PR is to support reading sorted bucket table efficiently in data source
v1 read path. Previously in `FileSourceScanExec` -> `FileScanRDD`, we read
input file sequentially one by one. For sorted bucket table which potentially
can have multiple sorted files per bucket, the sort ordering for each bucket
cannot be preserved as we read sorted files one by one. This PR is to add the
support to read sorted files all together in a sort-merge way to preserve the
ordering for each bucket.
Specifically the code change is:
* Add a parameter `ScanMode` (which can be either `RowMode` (read row by row
- non-vectorization), `BatchMode` (read batch by batch - vectorization), or
`SortedBucketMode` (read rows in sort-merge way - sorted bucketed file case))
for `FileScanRDD`. `FileScanRDD.compute()` decides which iterator to use based
on this `ScanMode`.
* Extract the existing iterator logic in `FileScanRDD` to
`BaseFileScanIterator`, which holds common logic for file scan.
`BaseFileScanIterator` has 3 subclasses where `FileRowScanIterator` reads row
by row for each file (`RowMode`), and `FileBatchScanIterator` reads batch by
batch for each file (`BatchMode`), and `FileSortedBucketScanIterator` reads row
by row based on each row ordering across multiple files (`SortedBucketMode`).
* `FileSortedBucketScanIterator`: a priority queue is used to output rows
from multiple sorted files based on table sorted columns.
The PR also separates the logic for existing row scan and batch scan, so we
don't need to do `if (nextElement.isInstanceOf[ColumnarBatch]) {` at line 100
of `FileScanRDD` per row, which was a long-standing TODO.
The whole feature is controlled by a new config
`spark.sql.sources.bucketing.sortedScan.enabled` which is disabled by default,
as there can be a risk to read more data in task and cause OOM (especially with
vectorization, we need to keep each batch of each file in memory at any time).
In addition, we need to set `InputFileBlockHolder` for each row (as any row
from any file can be outputted), so it can be costly.
### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
1. If you propose a new API, clarify the use case for a new API.
2. If you fix a bug, you can clarify why it is a bug.
-->
Right now for sorted bucket table, even though each individual file is
sorted, the current file scan approach does not preserve sort ordering across
files in one bucket, so a sort still needs for a sort-merge join or sort
aggregate. If the table is big, the external sort can happen and cause extra
CPU and IO overhead. Introducing the code path here to read multiple sorted
files in a sort-merge way to avoid later sort before join or aggregate, to save
CPU and IO resource.
At our internal fork, we added the similar support
(https://www.youtube.com/watch?v=brzInUisshY&feature=youtu.be&t=910), though
the code path is different (we are reading/writing hive table).
### Does this PR introduce _any_ user-facing change?
<!--
Note that it means *any* user-facing change including all aspects such as
the documentation fix.
If yes, please clarify the previous behavior and the change this PR proposes
- provide the console output, description and/or an example to show the
behavior difference if possible.
If possible, please also clarify if this is a user-facing change compared to
the released Spark versions or within the unreleased branches such as master.
If no, write 'No'.
-->
Yes. A new user-facing config
`spark.sql.sources.bucketing.sortedScan.enabled` is introduced to allow users
to read sorted bucketed table efficiently (e.g. no sort for sort merge join /
sort aggregate on bucketed columns). In addition, slightly change the query
plan for `FileSourceScanExec` physical operator, instead of printing `Batched`,
print out `ScanMode` now.
Example query plan with `ScanMode` in `FileScan`, and no `Sort` before
`SortMergeJoin`:
```
SortMergeJoin [i#38, j#39], [i#44, j#45], Inner
:- Filter (isnotnull(i#38) AND isnotnull(j#39))
: +- FileScan parquet default.bucketed_table1[i#38,j#39,k#40] DataFilters:
[isnotnull(i#38), isnotnull(j#39)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.sources...,
PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>, ScanMode: SortedBucketMode, SelectedBucketsCount:
8 out of 8
+- Filter (isnotnull(i#44) AND isnotnull(j#45))
+- FileScan parquet default.bucketed_table2[i#44,j#45,k#46] DataFilters:
[isnotnull(i#44), isnotnull(j#45)], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.sources...,
PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema:
struct<i:int,j:int,k:string>, ScanMode: SortedBucketMode, SelectedBucketsCount:
8 out of 8
```
### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some
test cases that check the changes thoroughly including negative and positive
cases if possible.
If it was tested in a way different from regular unit tests, please clarify
how you tested step by step, ideally copy and paste-able, so that other
reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why
it was difficult to add.
-->
Added unit tests in `BucketedReadSuite.scala` and `ExplainSuite.scala`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]