GitHub user massie opened a pull request:

    https://github.com/apache/spark/pull/7265

    [SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet

    This commit adds a new Spark shuffle manager which reads and writes shuffle 
data to Apache Parquet files. Parquet has a File interface (not a streaming 
interface) because it is column-oriented and seeks in a File for metadata 
information, e.g. schemas, statistics. As such, this implementation fetches 
remote data to local, temporary blocks before the data is passed to Parquet for 
reading.
        
    This managers uses the following spark configuration parameters to 
configure Parquet: `spark.shuffle.parquet.{compression, blocksize, pagesize, 
enable dictionary}.`
    
    There is a `spark.shuffle.parquet.fallback` configuration option which 
allows users to specify a fallback shuffle manager. If the Parquet manager 
finds that the classes being shuffled have no schema information, and therefore 
can't be used, it will fallback to the specified fallback manager. With this 
PR, only Avro `IndexedRecords` are supported in the Parquet shuffle; however, 
it is straight-forward to extend this to other serialization systems that 
Parquet supports, e.g. Apache Thrift. If there is no 
`spark.shuffle.parquet.fallback` defined, any shuffle objects which are not 
compatible with Parquet will cause an error to be thrown which lists the 
incompatible objects.
    
    Because the `ShuffleDependency` forwards the key, value and combined class 
information, a full schema can be generated before the first read/write. This 
allows for less errors (since reflection isn't used) and makes support for null 
values possible without complex code.
    
    The `ExternalSorter`, if needed, is setup to not spill to disk if Parquet 
is used. In the future, an `ExternalSorter` would need to be created that can 
read/write Parquet.
    
    Only record-level metrics are supported at this time. Byte-level metrics 
are not currently supported and are complicated somewhat by column compression.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/massie/spark parquet-shuffle

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/7265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #7265
    
----
commit fc03c0bd29fa71ff390b86a8f6fd31c1cbef960f
Author: Matt Massie <[email protected]>
Date:   2015-07-07T19:45:11Z

    Serialize key, value and combiner class names in ShuffleDependency
    
    This change enables shuffle manager implementations schema
    information from the key, value and combiner classes before
    creating shuffle readers and writers.

commit a6d276b7cf05bd0a14d6486b830e839a324b7b6a
Author: Matt Massie <[email protected]>
Date:   2015-07-07T19:52:59Z

    [SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet
    
    This commit adds a new Spark shuffle manager which reads and writes shuffle 
data to Apache
    Parquet files. Parquet has a File interface (not a streaming interface) 
because it is
    column-oriented and seeks in a File for metadata information, e.g. schemas, 
statistics.
    As such, this implementation fetches remote data to local, temporary blocks 
before the
    data is passed to Parquet for reading.
    
    This managers uses the following spark configuration parameters to 
configure Parquet:
    spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}.
    
    There is a spark.shuffle.parquet.fallback configuration option which allows 
users to
    specify a fallback shuffle manager. If the Parquet manager finds that the 
classes
    being shuffled have no schema information, and therefore can't be used, it 
will
    fallback to the specified fallback manager. With this PR, only Avro 
IndexedRecords
    are supported in the Parquet shuffle; however, it is straight-forward to 
extend
    this to other serialization systems that Parquet supports, e.g. Apache 
Thrift.
    If there is no spark.shuffle.parquet.fallback defined, any shuffle objects 
which are
    not compatible with Parquet will cause an error to be thrown which lists the
    incompatible objects.
    
    Because the ShuffleDependency forwards the key, value and combined class 
information,
    a full schema can be generated before the first read/write. This allows for 
less
    errors (since reflection isn't used) and makes support for null values 
possible without
    complex code.
    
    The ExternalSorter, if needed, is setup to not spill to disk if Parquet is 
used. In
    the future, an ExternalSorter would need to be created that can read/write 
Parquet.
    
    Only record-level metrics are supported at this time. Byte-level metrics 
are not
    currently supported and are complicated somewhat by column compression.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to