GitHub user mallman opened a pull request:
https://github.com/apache/spark/pull/16578
[SPARK-4502][SQL] Parquet nested column pruning
(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)
## What changes were proposed in this pull request?
One of the hallmarks of a column-oriented data storage format is the
ability to read data from a subset of columns, efficiently skipping reads from
other columns. Spark has long had support for pruning unneeded top-level schema
fields from the scan of a parquet file. For example, consider a table,
`contacts`, backed by parquet with the following Spark SQL schema:
```
root
|-- name: struct
| |-- first: string
| |-- last: string
|-- address: string
```
Parquet stores this table's data in three physical columns: `name.first`,
`name.last` and `address`. To answer the query
```SQL
select address from contacts
```
Spark will read only from the `address` column of parquet data. However, to
answer the query
```SQL
select name.first from contacts
```
Spark will read `name.first` and `name.last` from parquet.
This PR modifies Spark SQL to support a finer-grain of schema pruning. With
this patch, Spark reads only the `name.first` column to answer the previous
query.
### Implementation
There are three main components of this patch. First, there is a
`ParquetSchemaPruning` optimizer rule for gathering the required schema fields
of a `PhysicalOperation` over a parquet file, constructing a new schema based
on those required fields and rewriting the plan in terms of that pruned schema.
The pruned schema fields are pushed down to the parquet requested read schema.
`ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for
rewriting a catalyst expression in terms of a pruned schema.
Second, the `ParquetRowConverter` has been patched to ensure the ordinals
of the parquet columns read are correct for the pruned schema.
`ParquetReadSupport` has been patched to address a compatibility mismatch
between Spark's built in vectorized reader and the parquet-mr library's reader.
Third, we introduce two new catalyst query transformations,
`AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown`, to
support schema pruning in aggregation and join query plans. These rules extract
field references in aggregations and joins respectively, push down aliases to
those references and replace them with references to the pushed down aliases.
They use a new `SelectedField` extractor that transforms a catalyst complex
type extractor (the "selected field") into a corresponding `StructField`.
### Performance
The performance difference in executing queries with this patch compared to
master is related to the depth of the table schema and the query itself. At
VideoAmp, one of our biggest tables stores OpenRTB bid requests we receive from
our exchange partners. Our bid request table's schema closely follows the
OpenRTB bid request object schema. Additionally, when we bid we save our
response along with the request in the same table. We store these two objects
as two top-level fields in our table. Therefore, all bid request and response
data are contained within nested fields.
For the purposes of measuring the performance impact of this patch, we ran
some queries on our bid request table with the un-patched and patched master.
We measured query execution time and the amount of data read from the
underlying parquet files. I'll focus on a couple of benchmarks. (All benchmarks
were run on an AWS EC2 cluster with four c3.8xl workers.) The first query I'll
highlight is
```SQL
select count(request.device.ip) from event.bid_request where ds=20161128
and h=0
```
(Hopefully it's obvious what this query means.) On the un-patched master,
this query ran in 2.7 minutes and read 34.3 GB of data. On the patched master,
this query ran in 4 seconds and read 987.3 MB of data.
We also ran a reporting-oriented query benchmark. I won't reproduce the
query here, but it reads a larger subset of the bid request fields and joins
against another table with a deeply nested schema. In addition to a join, we
perform several aggregations in this query. On the un-patched master, this
query ran in 3.4 minutes and read 34.6 GB of data. On the patched master, this
query ran in 59 seconds and read 2.6 GB of data.
### Limitation
Among the complex Spark SQL data types, this patch supports parquet column
pruning of nested sequences of struct fields only.
## How was this patch tested?
Care has been taken to ensure correctness and prevent regressions. This
patch introduces over two dozen new unit tests and has been running on a
production Spark 1.5 cluster at VideoAmp for about a year. In that time, one
bug was found and fixed early on, and we added a regression test for that bug.
We forward-ported this patch to Spark master in June 2016 and have been
running this patch against Spark 2.0 and 2.1 branches on ad-hoc clusters since
then.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/VideoAmp/spark-public
spark-4502-parquet_column_pruning
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16578.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16578
----
commit 89253722bde65b02467d05f2b64c4fdbef3b885d
Author: Michael Allman <[email protected]>
Date: 2016-06-24T17:21:24Z
[SPARK-4502][SQL] Parquet nested column pruning
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]