GitHub user liancheng opened a pull request:
https://github.com/apache/spark/pull/8509
[SPARK-10301] [SQL] Fixes schema merging for nested structs
This PR can be quite challenging to review. I'm trying to give a detailed
description of the problem as well as its solution here.
When reading Parquet files, we need to specify a potentially nested Parquet
schema (of type `MessageType`) as requested schema for column pruning. This
Parquet schema is translated from a Catalyst schema (of type `StructType`),
which is generated by the query planner and represents all requested columns.
However, this translation can be fairly complicated because of several reasons:
1. Requested schema must conform to the real schema of the physical file
to be read.
This means we have to tailor the actual file schema of every individual
physical Parquet file to be read according to the given Catalyst schema.
Fortunately we are already doing this in Spark 1.5 by pushing request schema
conversion to executor side in PR #7231.
1. Support for schema merging.
A single Parquet dataset may consist of multiple physical Parquet files
come with different but compatible schemas. This means we may request for a
column path that doesn't exist in a physical Parquet file. All requested
column paths can be nested. For example, for a Parquet file schema
```
message root {
required group f0 {
required group f00 {
required int32 f000;
required binary f001 (UTF8);
}
}
}
```
we may request for column paths defined in the following schema:
```
message root {
required group f0 {
required group f00 {
required binary f001 (UTF8);
required float f002;
}
}
optional double f1;
}
```
Notice that we pruned column path `f0.f00.f000`, but added
`f0.f00.f002` and `f1`.
The good news is that Parquet handles non-existing column paths
properly and always returns null for them.
1. The map from `StructType` to `MessageType` is a one-to-many map.
This is the most unfortunate part.
Due to historical reasons (dark histories!), schemas of Parquet files
generated by different libraries have different "flavors". For example, to
handle a schema with a single non-nullable column, whose type is an array of
non-nullable integers, parquet-protobuf generates the following Parquet schema:
```
message m0 {
repeated int32 f;
}
```
while parquet-avro generates another version:
```
message m1 {
required group f (LIST) {
repeated int32 array;
}
}
```
and parquet-thrift spills this:
```
message m1 {
required group f (LIST) {
repeated int32 f_tuple;
}
}
```
All of them can be mapped to the following _unique_ Catalyst schema:
```
StructType(
StructField(
"f",
ArrayType(IntegerType, containsNull = false),
nullable = false))
```
This greatly complicates Parquet requested schema construction, since
the path of a given column varies in different cases. To read the array
elements from files with the above schemas, we must use `f` for `m0`, `f.array`
for `m1`, and `f.f_tuple` for `m2`.
In earlier Spark versions, we didn't try to fix this issue properly. Spark
1.4 and prior versions simply translate the Catalyst schema in a way more or
less compatible with parquet-hive and parquet-avro, but is broken in many other
cases. Earlier revisions of Spark 1.5 only try to tailor the Parquet file
schema at the first level, and ignore nested one. This caused [SPARK-10301]
[spark-10301] as well as [SPARK-10005] [spark-10005]. In PR #8228, I tried to
avoid the hard part of the problem and made a minimum change in
`CatalystRowConverter` to fix SPARK-10005. However, when taking SPARK-10301
into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a
good idea. SO this PR is an attempt to fix the problem in a proper way.
For a given physical Parquet file with schema `ps` and a compatible
Catalyst requested schema `cs`, we use the following algorithm to tailor `ps`
to get the result Parquet requested schema `ps'`:
For a leaf column path `c` in `cs`:
- if `c` exists in `cs` and a corresponding Parquet column path `c'` can be
found in `ps`, `c'` should be included in `ps'`;
- otherwise, we convert `c` to a Parquet column path `c"` using
`CatalystSchemaConverter`, and include `c"` in `ps'`;
- no other column path should exist in `ps'`.
Then comes the most tedious part:
> Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`?
Unfortunately, there's no quick answer, and we have to enumerate all
possible structures defined in parquet-format spec. They are:
1. the standard structure of nested types, and
1. cases defined in all backwards-compatibility rules for `LIST` and `MAP`.
The core part of this PR is `CatalystReadSupport.clipParquetType()`, which
tailors a given Parquet file schema according to a requested schema in its
Catalyst form. Backwards-compatibility rules of `LIST` and `MAP` are covered
in `clipParquetListType()` and `clipParquetMapType()` respectively. The column
path selection algorithm is implemented in `clipParquetRecord()`.
[spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301
[spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/liancheng/spark
spark-10301/fix-parquet-requested-schema
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/8509.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #8509
----
commit 8e608995a35be616bd6956aef1e8528bc8017e32
Author: Cheng Lian <[email protected]>
Date: 2015-08-27T18:21:20Z
Clips Parquet requested schema for better compatibility
commit a53cf346e694fbf5bb9efba7acfa08ae8f0c91ee
Author: Cheng Lian <[email protected]>
Date: 2015-08-28T13:53:20Z
Fixes test failures
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]