GitHub user marmbrus opened a pull request:
https://github.com/apache/spark/pull/11509
[SPARK-13665][SQL] Separate the concerns of HadoopFsRelation
`HadoopFsRelation` is used for reading most files into Spark SQL. However
today this class mixes the concerns of file management, schema reconciliation,
scan building, bucketing, partitioning, and writing data. As a result, many
data sources are forced to reimplement the same functionality and the various
layers have accumulated a fair bit of inefficiency. This PR is a first cut at
separating this into several components / interfaces that are each described
below. Additionally, all implementations inside of Spark have been ported to
the new API `FileFormat`. Note that external libraries, such as spark-avro
will also need to be ported to work with Spark 2.0.
### HadoopFsRelation
A simple case class that acts as a container for all of the metadata
required to read from a datasource. All discovery, resolution and merging
logic for schemas and partitions has been removed. This an internal
representation that is no longer needs to be exposed to developers.
```scala
case class HadoopFsRelation(
sqlContext: SQLContext,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String])
```
### FileFormat
The primary interface that will be implemented by each different format
including external libraries. Implementors are responsible for reading a given
format and converting it into `InternalRow` as well as writing out rows.
Optionally, a format can also return a schema that is inferred from a set of
files when possible.
```scala
trait FileFormat {
def inferSchema(
sqlContext: SQLContext,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
def prepareWrite(
sqlContext: SQLContext,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
def buildInternalScan(
sqlContext: SQLContext,
dataSchema: StructType,
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow]
}
```
The current interface is based on what was required to get all the tests
passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed
down to the scan instead of being resolved by the planner). Additionally,
scans are still returning `RDD`s instead of iterators for single files. In a
future PR, bucketing should be removed from this interface and the scan should
be isolated to a single file.
### FileCatalog
This interface is used to list the files that make up a given relation, as
well as handle directory based partitioning.
```scala
trait FileCatalog {
def paths: Seq[Path]
def partitionSpec(schema: Option[StructType]): PartitionSpec
def allFiles(): Seq[FileStatus]
def getStatus(path: Path): Array[FileStatus]
def refresh(): Unit
}
```
Currently there are two implementations:
- `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.
Infers partitioning by recursive listing and caches this data for performance
- `HiveFileCatalog` - based on the above, but it uses the partition spec
from the Hive Metastore.
### ResolvedDataSource
Produces a logical plan given the following description of a Data Source
(which can come from DataFrameReader or a metastore):
- `paths: Seq[String] = Nil`
- `userSpecifiedSchema: Option[StructType] = None`
- `partitionColumns: Array[String] = Array.empty`
- `bucketSpec: Option[BucketSpec] = None`
- `provider: String`
- `options: Map[String, String]`
This class is responsible for deciding which of the Data Source APIs a
given provider is using (including the non-file based ones). All
reconciliation of partitions, buckets, schema from metastores or inference is
done here.
### DataSourceStrategy / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of
the Data Source APIs. This includes concerns such as:
- pruning the partitions that will be read based on filters.
- appending partition columns.
- applying additional filters when a data source can not evaluate them
internally.
- constructing an RDD that is bucketed correctly when required.
- sanity checking schema match-up and other analysis when writing.
In the future we should do that following:
- Break out file handling into its own Strategy as its sufficiently
complex / isolated.
- Push the appending of partition columns down in to `FileFormat` to avoid
an extra copy / unvectorization.
- Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
TODO:
- [ ] Better initial scala doc on the interfaces
- [ ] Create follow-up JIRAs for the TODOs described above and in the code.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/marmbrus/spark fileDataSource
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/11509.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #11509
----
commit c2c2fcdf0d8b6d39797ce3610dcbc977d400c0d6
Author: Michael Armbrust <[email protected]>
Date: 2016-02-25T02:04:56Z
WIP
commit 4687a66eb427021ae11db598b7e4bf9126df436c
Author: Michael Armbrust <[email protected]>
Date: 2016-02-25T18:19:35Z
WIP
commit 0bf0d021dff3384a7845fdd65b5a9491fbb4a254
Author: Michael Armbrust <[email protected]>
Date: 2016-02-25T23:05:57Z
WIP: basic read/write workign
commit 1f35b90bc3d1b67c676ced335f410c540037c658
Author: Michael Armbrust <[email protected]>
Date: 2016-02-26T00:09:13Z
WIP: trying to get appending
commit 4bc04e3384af2b84f12f4459f4b5275ac1657e00
Author: Michael Armbrust <[email protected]>
Date: 2016-02-26T21:38:36Z
working on partitioning
commit a27b4a6bc9395c74b9507f36443102dc96c19218
Author: Michael Armbrust <[email protected]>
Date: 2016-02-26T22:39:14Z
WIP: many tests passing
commit 159e4c4a42558d7d1b84648800fcf021a1defa10
Author: Michael Armbrust <[email protected]>
Date: 2016-02-28T22:59:13Z
WIP: parquet/hive compiling
commit 72996601347cf417598c048d53618b840c29733e
Author: Michael Armbrust <[email protected]>
Date: 2016-02-29T18:09:07Z
:(
commit 049ac1bea8ce9c8562590d75b8819aa0e5bf3300
Author: Michael Armbrust <[email protected]>
Date: 2016-03-01T19:48:43Z
much of hive passing
commit 405f2841632c51f7f21e770587c39a34065c3e97
Author: Michael Armbrust <[email protected]>
Date: 2016-03-01T19:57:20Z
Merge remote-tracking branch 'apache/master' into fileDataSource
Conflicts:
sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
commit d28300b2a6b9a1f40651f77f69c0eb8716571cce
Author: Michael Armbrust <[email protected]>
Date: 2016-03-01T22:32:22Z
more progress
commit 6b136744354c3fc0c3756b74fd5a889c7cb95818
Author: Michael Armbrust <[email protected]>
Date: 2016-03-02T00:08:08Z
WIP
commit a975f2dbf748e03806ebd0e91e99e09d679a8a65
Author: Michael Armbrust <[email protected]>
Date: 2016-03-02T21:27:32Z
WIP: all but bucketing
commit 5275c41843a386ed109a9a8cfa058be78b946c51
Author: Michael Armbrust <[email protected]>
Date: 2016-03-03T02:51:19Z
Still workign on bucketing...
commit 0d4b08ab7219406647c65fd2f10591c23e9b2487
Author: Michael Armbrust <[email protected]>
Date: 2016-03-03T03:00:17Z
restore
commit 428a62fdc3e48d8f6ee063847c892008243fad54
Author: Michael Armbrust <[email protected]>
Date: 2016-03-03T03:00:48Z
remove
commit 1a41e151fe9e2f21c84291d1d51ca527737c5050
Author: Wenchen Fan <[email protected]>
Date: 2016-03-03T15:07:02Z
fix all tests
commit 2a49e8af147bdf3fb3f7970ce7a6060bf07b9782
Author: Michael Armbrust <[email protected]>
Date: 2016-03-03T19:09:51Z
Merge pull request #32 from cloud-fan/fileDataSource
fix all tests
commit 023f13342f7074656879ff940d0fce4a4250f2a5
Author: Michael Armbrust <[email protected]>
Date: 2016-03-03T22:28:33Z
Merge remote-tracking branch 'apache/master' into fileDataSource
Conflicts:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
commit 83fbb44fc57940711609f9a3eacd29222319e657
Author: Michael Armbrust <[email protected]>
Date: 2016-03-04T01:24:47Z
TESTS PASSING?\!?
commit 175e78f2157ebc1da926ba37841348e7e16182d1
Author: Michael Armbrust <[email protected]>
Date: 2016-03-04T02:11:49Z
cleanup
commit 216078c681ffc054b9ca9f5295647b1d3c4dbaa5
Author: Michael Armbrust <[email protected]>
Date: 2016-03-04T02:27:56Z
style
commit ac5427877f2a22408ef1e1aa2cfb2742f38f63bc
Author: Michael Armbrust <[email protected]>
Date: 2016-03-04T02:38:03Z
Merge remote-tracking branch 'apache/master' into fileDataSource
Conflicts:
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]