GitHub user mallman opened a pull request:
https://github.com/apache/spark/pull/14690
[SPARK-16980][SQL] Load only catalog table partition metadata required to
answer a query
(This PR addresses https://issues.apache.org/jira/browse/SPARK-16980.)
(N.B. I'm submitting this PR as an enhanced version of an internal POC I
wrote. I'm looking for preliminary feedback on what I have so far and to
discuss some design and implementation issues. This PR is not currently a
candidate for merging into master.)
(N.B. This PR is known to fail several unit tests related to Hive/Parquet
conversion. Obviously, these failures will be addressed before this PR is
submitted for merging into master.)
## What changes were proposed in this pull request?
In a new Spark application, when a partitioned Hive table is converted to
use Spark's `HadoopFsRelation` in `HiveMetastoreCatalog`, metadata for every
partition of that table are retrieved from the metastore and loaded into driver
memory. In addition, every partition's metadata files are read from the
filesystem to perform schema inference.
If a user queries such a table with predicates which prune that table's
partitions, we would like to be able to answer that query without consulting
partition metadata which are not involved in the query. When querying a table
with a large number of partitions for some data from a small number of
partitions (maybe even a single partition), the current conversion strategy is
highly inefficient. I suspect this scenario is not uncommon in the wild.
In addition to being inefficient in running time, the current strategy is
inefficient in its use of driver memory. When the sum of the number of
partitions of all tables loaded in a driver reaches a certain level (somewhere
in the tens of thousands), their cached data exhaust all driver heap memory in
the default configuration. I suspect this scenario is less common (in that not
too many deployments work with tables with tens of thousands of partitions),
however this does illustrate how large the memory footprint of this metadata
can be. With tables with hundreds or thousands of partitions, I would expect
the `HiveMetastoreCatalog` table cache to represent a significant portion of
the driver's heap space.
This PR proposes an alternative approach. Basically, it makes three changes:
1. It adds a new method, `listPartitionsByFilter` to the Catalyst
`ExternalCatalog` trait which returns the partition metadata for a given
sequence of partition pruning predicates.
1. It refactors the `FileCatalog` type hierarchy to include a new
`TableFileCatalog` to efficiently return files only for partitions matching a
sequence of partition pruning predicates.
1. It removes partition loading and caching from `HiveMetastoreCatalog`.
The net effect is that when a query over a partitioned Hive table is
planned, the analyzer retrieves the table metadata from `HiveMetastoreCatalog`.
As part of this operation, the `HiveMetastoreCatalog` builds a
`HadoopFsRelation` with a `TableFileCatalog`. It does not load any partition
metadata or scan any files. The physical planner identifies the data files the
query needs by asking the relation's `TableFileCatalog` for the files matching
any predicate pruning predicates. The `TableFileCatalog` in turn calls the
`listPartitionsByFilter` method on its external catalog. This queries the Hive
metastore, passing along those filters.
## Open Issues
1. This PR omits partition metadata caching. I'm not sure if this is even
needed if we're only loading partition metadata for a given query. However, it
may not be that tricky to implement this effectively.
1. This PR removes and omits partitioned Hive table schema reconciliation.
As a result, it fails to find Parquet schema columns with upper case letters
because of the Hive metastore's case-insensitivity. I think this is the most
significant hurdle for this PR. It just occurred to me that we might be able to
do just-in-time schema reconciliation using the partitions that are used in a
query. I haven't tried this, but I would attempt this by adding a method to
`HadoopFsRelation` or `BasicFileCatalog` which returns a SQL schema for a given
sequence of partition pruning predicates (or partitions). I'll give this a try
and report back. Another idea would be to use the current strategy of merging
schema from all table files unless the user sets a boolean SQL configuration
parameter like `spark.sql.assumeLowerCaseColumnNames`. If the user's tables
have only lower-case column names, then it's safe to use this PR's
optimizations. I don't think this is an entirely unrealistic scenario as w
e have enforced all lower-case column names from the beginning because of
case-sensitivity issues. Maybe we're not the only ones?
1. This PR omits an implementation of `listPartitionsByFilter` for the
`InMemoryCatalog`.
1. Should we use `TableFileCatalog` for non-partitioned tables, too (to
unify the partitioned and non-partitioned table access code paths)? This is
probably something to explore in a future PR.
1. This PR breaks parquet log output redirection. I can work around this by
running
`Class.forName("org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$")`
first thing in a Spark shell session, however we need to figure out how to fix
this properly.
1. This PR includes no new unit tests. We should probably have some. :)
## How was this patch tested?
The current Spark unit tests were run, and some ad-hoc tests were performed
to validate that only the necessary partition metadata is loaded.
N.B. Several tests in the hive subproject's `ParquetMetastoreSuite` fail.
All of these tests fail because of the use of upper case characters in the test
parquet table column names.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/VideoAmp/spark-public
spark-16980-lazy_partition_fetching
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/14690.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #14690
----
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]