GitHub user marmbrus opened a pull request:

    https://github.com/apache/spark/pull/11509

    [SPARK-13665][SQL] Separate the concerns of HadoopFsRelation

    `HadoopFsRelation` is used for reading most files into Spark SQL.  However 
today this class mixes the concerns of file management, schema reconciliation, 
scan building, bucketing, partitioning, and writing data.  As a result, many 
data sources are forced to reimplement the same functionality and the various 
layers have accumulated a fair bit of inefficiency.  This PR is a first cut at 
separating this into several components / interfaces that are each described 
below.  Additionally, all implementations inside of Spark have been ported to 
the new API `FileFormat`.  Note that external libraries, such as spark-avro 
will also need to be ported to work with Spark 2.0.
    
    ### HadoopFsRelation
    A simple case class that acts as a container for all of the metadata 
required to read from a datasource.  All discovery, resolution and merging 
logic for schemas and partitions has been removed.  This an internal 
representation that is no longer needs to be exposed to developers.
    
    ```scala
    case class HadoopFsRelation(
        sqlContext: SQLContext,
        location: FileCatalog,
        partitionSchema: StructType,
        dataSchema: StructType,
        bucketSpec: Option[BucketSpec],
        fileFormat: FileFormat,
        options: Map[String, String])
    ```
    
    ### FileFormat
    The primary interface that will be implemented by each different format 
including external libraries.  Implementors are responsible for reading a given 
format and converting it into `InternalRow` as well as writing out rows.  
Optionally, a format can also return a schema that is inferred from a set of 
files when possible.
    
    ```scala
    trait FileFormat {
      def inferSchema(
          sqlContext: SQLContext,
          options: Map[String, String],
          files: Seq[FileStatus]): Option[StructType]
    
      def prepareWrite(
          sqlContext: SQLContext,
          job: Job,
          options: Map[String, String],
          dataSchema: StructType): OutputWriterFactory
    
      def buildInternalScan(
          sqlContext: SQLContext,
          dataSchema: StructType,
          requiredColumns: Array[String],
          filters: Array[Filter],
          bucketSet: Option[BitSet],
          inputFiles: Array[FileStatus],
          broadcastedConf: Broadcast[SerializableConfiguration],
          options: Map[String, String]): RDD[InternalRow]
    }
    ```
    
    The current interface is based on what was required to get all the tests 
passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed 
down to the scan instead of being resolved by the planner).  Additionally, 
scans are still returning `RDD`s instead of iterators for single files.  In a 
future PR, bucketing should be removed from this interface and the scan should 
be isolated to a single file.
    
    ### FileCatalog
    This interface is used to list the files that make up a given relation, as 
well as handle directory based partitioning.
    
    ```scala
    trait FileCatalog {
      def paths: Seq[Path]
      def partitionSpec(schema: Option[StructType]): PartitionSpec
      def allFiles(): Seq[FileStatus]
      def getStatus(path: Path): Array[FileStatus]
      def refresh(): Unit
    }
    ```
    
    Currently there are two implementations:
     - `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`.  
Infers partitioning by recursive listing and caches this data for performance
     - `HiveFileCatalog` - based on the above, but it uses the partition spec 
from the Hive Metastore.
    
    ### ResolvedDataSource
    Produces a logical plan given the following description of a Data Source 
(which can come from DataFrameReader or a metastore): 
     - `paths: Seq[String] = Nil`
     - `userSpecifiedSchema: Option[StructType] = None`
     - `partitionColumns: Array[String] = Array.empty`
     - `bucketSpec: Option[BucketSpec] = None`
     - `provider: String`
     - `options: Map[String, String]`
    
    This class is responsible for deciding which of the Data Source APIs a 
given provider is using (including the non-file based ones).  All 
reconciliation of partitions, buckets, schema from metastores or inference is 
done here.
    
    ### DataSourceStrategy / DataSourceStrategy
    Responsible for analyzing and planning reading/writing of data using any of 
the Data Source APIs.  This includes concerns such as:
     - pruning the partitions that will be read based on filters.
     - appending partition columns.
     - applying additional filters when a data source can not evaluate them 
internally.
     - constructing an RDD that is bucketed correctly when required.
     - sanity checking schema match-up and other analysis when writing.
    
    In the future we should do that following:
     - Break out file handling into its own Strategy as its sufficiently 
complex / isolated.
     - Push the appending of partition columns down in to `FileFormat` to avoid 
an extra copy / unvectorization.
     - Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
    
    TODO:
     - [ ] Better initial scala doc on the interfaces
     - [ ] Create follow-up JIRAs for the TODOs described above and in the code.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/marmbrus/spark fileDataSource

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11509.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11509
    
----
commit c2c2fcdf0d8b6d39797ce3610dcbc977d400c0d6
Author: Michael Armbrust <[email protected]>
Date:   2016-02-25T02:04:56Z

    WIP

commit 4687a66eb427021ae11db598b7e4bf9126df436c
Author: Michael Armbrust <[email protected]>
Date:   2016-02-25T18:19:35Z

    WIP

commit 0bf0d021dff3384a7845fdd65b5a9491fbb4a254
Author: Michael Armbrust <[email protected]>
Date:   2016-02-25T23:05:57Z

    WIP: basic read/write workign

commit 1f35b90bc3d1b67c676ced335f410c540037c658
Author: Michael Armbrust <[email protected]>
Date:   2016-02-26T00:09:13Z

    WIP: trying to get appending

commit 4bc04e3384af2b84f12f4459f4b5275ac1657e00
Author: Michael Armbrust <[email protected]>
Date:   2016-02-26T21:38:36Z

    working on partitioning

commit a27b4a6bc9395c74b9507f36443102dc96c19218
Author: Michael Armbrust <[email protected]>
Date:   2016-02-26T22:39:14Z

    WIP: many tests passing

commit 159e4c4a42558d7d1b84648800fcf021a1defa10
Author: Michael Armbrust <[email protected]>
Date:   2016-02-28T22:59:13Z

    WIP: parquet/hive compiling

commit 72996601347cf417598c048d53618b840c29733e
Author: Michael Armbrust <[email protected]>
Date:   2016-02-29T18:09:07Z

    :(

commit 049ac1bea8ce9c8562590d75b8819aa0e5bf3300
Author: Michael Armbrust <[email protected]>
Date:   2016-03-01T19:48:43Z

    much of hive passing

commit 405f2841632c51f7f21e770587c39a34065c3e97
Author: Michael Armbrust <[email protected]>
Date:   2016-03-01T19:57:20Z

    Merge remote-tracking branch 'apache/master' into fileDataSource
    
    Conflicts:
        sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
        sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

commit d28300b2a6b9a1f40651f77f69c0eb8716571cce
Author: Michael Armbrust <[email protected]>
Date:   2016-03-01T22:32:22Z

    more progress

commit 6b136744354c3fc0c3756b74fd5a889c7cb95818
Author: Michael Armbrust <[email protected]>
Date:   2016-03-02T00:08:08Z

    WIP

commit a975f2dbf748e03806ebd0e91e99e09d679a8a65
Author: Michael Armbrust <[email protected]>
Date:   2016-03-02T21:27:32Z

    WIP: all but bucketing

commit 5275c41843a386ed109a9a8cfa058be78b946c51
Author: Michael Armbrust <[email protected]>
Date:   2016-03-03T02:51:19Z

    Still workign on bucketing...

commit 0d4b08ab7219406647c65fd2f10591c23e9b2487
Author: Michael Armbrust <[email protected]>
Date:   2016-03-03T03:00:17Z

    restore

commit 428a62fdc3e48d8f6ee063847c892008243fad54
Author: Michael Armbrust <[email protected]>
Date:   2016-03-03T03:00:48Z

    remove

commit 1a41e151fe9e2f21c84291d1d51ca527737c5050
Author: Wenchen Fan <[email protected]>
Date:   2016-03-03T15:07:02Z

    fix all tests

commit 2a49e8af147bdf3fb3f7970ce7a6060bf07b9782
Author: Michael Armbrust <[email protected]>
Date:   2016-03-03T19:09:51Z

    Merge pull request #32 from cloud-fan/fileDataSource
    
    fix all tests

commit 023f13342f7074656879ff940d0fce4a4250f2a5
Author: Michael Armbrust <[email protected]>
Date:   2016-03-03T22:28:33Z

    Merge remote-tracking branch 'apache/master' into fileDataSource
    
    Conflicts:
        
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
        
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
        sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
        
sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala
        
sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

commit 83fbb44fc57940711609f9a3eacd29222319e657
Author: Michael Armbrust <[email protected]>
Date:   2016-03-04T01:24:47Z

    TESTS PASSING?\!?

commit 175e78f2157ebc1da926ba37841348e7e16182d1
Author: Michael Armbrust <[email protected]>
Date:   2016-03-04T02:11:49Z

    cleanup

commit 216078c681ffc054b9ca9f5295647b1d3c4dbaa5
Author: Michael Armbrust <[email protected]>
Date:   2016-03-04T02:27:56Z

    style

commit ac5427877f2a22408ef1e1aa2cfb2742f38f63bc
Author: Michael Armbrust <[email protected]>
Date:   2016-03-04T02:38:03Z

    Merge remote-tracking branch 'apache/master' into fileDataSource
    
    Conflicts:
        
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
        
sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestRelationSuite.scala

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to