GitHub user massie opened a pull request:
https://github.com/apache/spark/pull/7265
[SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet
This commit adds a new Spark shuffle manager which reads and writes shuffle
data to Apache Parquet files. Parquet has a File interface (not a streaming
interface) because it is column-oriented and seeks in a File for metadata
information, e.g. schemas, statistics. As such, this implementation fetches
remote data to local, temporary blocks before the data is passed to Parquet for
reading.
This managers uses the following spark configuration parameters to
configure Parquet: `spark.shuffle.parquet.{compression, blocksize, pagesize,
enable dictionary}.`
There is a `spark.shuffle.parquet.fallback` configuration option which
allows users to specify a fallback shuffle manager. If the Parquet manager
finds that the classes being shuffled have no schema information, and therefore
can't be used, it will fallback to the specified fallback manager. With this
PR, only Avro `IndexedRecords` are supported in the Parquet shuffle; however,
it is straight-forward to extend this to other serialization systems that
Parquet supports, e.g. Apache Thrift. If there is no
`spark.shuffle.parquet.fallback` defined, any shuffle objects which are not
compatible with Parquet will cause an error to be thrown which lists the
incompatible objects.
Because the `ShuffleDependency` forwards the key, value and combined class
information, a full schema can be generated before the first read/write. This
allows for less errors (since reflection isn't used) and makes support for null
values possible without complex code.
The `ExternalSorter`, if needed, is setup to not spill to disk if Parquet
is used. In the future, an `ExternalSorter` would need to be created that can
read/write Parquet.
Only record-level metrics are supported at this time. Byte-level metrics
are not currently supported and are complicated somewhat by column compression.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/massie/spark parquet-shuffle
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/7265.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #7265
----
commit fc03c0bd29fa71ff390b86a8f6fd31c1cbef960f
Author: Matt Massie <[email protected]>
Date: 2015-07-07T19:45:11Z
Serialize key, value and combiner class names in ShuffleDependency
This change enables shuffle manager implementations schema
information from the key, value and combiner classes before
creating shuffle readers and writers.
commit a6d276b7cf05bd0a14d6486b830e839a324b7b6a
Author: Matt Massie <[email protected]>
Date: 2015-07-07T19:52:59Z
[SPARK-7263] Add new shuffle manager which stores shuffle blocks in Parquet
This commit adds a new Spark shuffle manager which reads and writes shuffle
data to Apache
Parquet files. Parquet has a File interface (not a streaming interface)
because it is
column-oriented and seeks in a File for metadata information, e.g. schemas,
statistics.
As such, this implementation fetches remote data to local, temporary blocks
before the
data is passed to Parquet for reading.
This managers uses the following spark configuration parameters to
configure Parquet:
spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}.
There is a spark.shuffle.parquet.fallback configuration option which allows
users to
specify a fallback shuffle manager. If the Parquet manager finds that the
classes
being shuffled have no schema information, and therefore can't be used, it
will
fallback to the specified fallback manager. With this PR, only Avro
IndexedRecords
are supported in the Parquet shuffle; however, it is straight-forward to
extend
this to other serialization systems that Parquet supports, e.g. Apache
Thrift.
If there is no spark.shuffle.parquet.fallback defined, any shuffle objects
which are
not compatible with Parquet will cause an error to be thrown which lists the
incompatible objects.
Because the ShuffleDependency forwards the key, value and combined class
information,
a full schema can be generated before the first read/write. This allows for
less
errors (since reflection isn't used) and makes support for null values
possible without
complex code.
The ExternalSorter, if needed, is setup to not spill to disk if Parquet is
used. In
the future, an ExternalSorter would need to be created that can read/write
Parquet.
Only record-level metrics are supported at this time. Byte-level metrics
are not
currently supported and are complicated somewhat by column compression.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]