GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/5868

    [WIP][SPARK-7078][SPARK-7081][SPARK-7311] Faster sort-based shuffle path 
using binary processing cache-aware sort

    This patch introduces a new shuffle manager that enhances the existing 
sort-based shuffle with a new cache-friendly sort algorithm that operates 
directly on binary data. The goals of this patch are to lower memory usage and 
Java object overheads during shuffle and to speed up sorting. It also lays 
groundwork for follow-up patches that will enable end-to-end processing of 
serialized records.
    
    The new shuffle manager, `UnsafeShuffleManager`, can be enabled by setting 
`spark.shuffle.manager=unsafe` in SparkConf. The new shuffle write path in this 
manager will only be used for shuffles in which there is no specified key 
ordering, no aggregation, and for which a supported serializer is used. In 
cases where these conditions are not met, `UnsafeShuffleManager` will 
automatically delegate to the old `SortShuffleManager`.
    
    At a high level, the new shuffle write path performs the following steps:
    
    - Serialize each input record into a temporary buffer, then append the 
serialized records to data pages allocated via the new `unsafe` MemoryManager 
APIs.
    - As records are copied into the data pages, construct an array of record 
pointers and record partition ids.
    - Once all input records have been read, perform an Alphasort-style prefix 
sort of our pointer array. This sort stores pointers to records alongside a 
user-defined prefix of the record's sorting key. In the case of sort-based 
shuffle, the sort key prefix is simply the record’s partition number, but in 
the future this can also store portions of the record data (for use in 
secondary sorting of records). When the underlying sort algorithm compares 
records, it will first compare the stored key prefixes; if the prefixes are not 
equal, then we do not need to traverse the record pointers to compare the 
actual records (in the special case where the prefix _is_ the entire sort key, 
we don’t need to compare records at all). Avoiding these random memory 
accesses improves cache hit rates.
    - Traverse the sorted pointer array and write the serialized records to a 
single combined output file.
    
    The shuffle read path is unchanged.
    
    This patch is similar to 
[SPARK-4550](http://issues.apache.org/jira/browse/SPARK-4550) / #4450 but uses 
a slightly different implementation. The `unsafe`-based implementation featured 
in this patch lays the groundwork for followup patches that will enable sorting 
to operate on serialized data pages that will be prepared by Spark SQL's new 
`unsafe` operators (such as the new aggregation operator introduced in #5725).
    
    ### Supported serializers
    
    Because this sort operates on serialized records, it places certain 
requirements on the record serialization format. Specifically, it requires a 
_relocatability_ property in which records in a serialized stream of records 
can be re-ordered by reordering their serialized representations. For example:
    
    ```
       serOut.open()
       position = 0
       serOut.write(obj1)
       serOut.flush()
       position = # of bytes writen to stream so far
       obj1Bytes = [bytes 0 through position of stream]
       serOut.write(obj2)
       serOut.flush
       position2 = # of bytes written to stream so far
       obj2Bytes = bytes[position through position2 of stream]
      
       serIn.open([obj2bytes] concatenate [obj1bytes]) should return (obj2, 
obj1)
    ```
    
    As #4450 showed, KryoSerializer has this property under certain 
configurations. Other serialization formats also have this property, such as 
SparkSQLSerializer, SparkSQLSerializer2, and UnsafeRow. Therefore, this patch 
also tries to address 
[SPARK-7311](https://issues.apache.org/jira/browse/SPARK-7311) by extending the 
serializer API to allow serializers to specify whether they have this property.
    
    ### Future work
    
    There are several tasks that build upon this patch, which will be left to 
future work:
    
    - [SPARK-7079](https://issues.apache.org/jira/browse/SPARK-7079) Add an 
external sorter that builds upon `UnsafeSort`. This can integrate with the 
existing SortMemoryManager APIs. Because we use explicitly managed memory, we 
can avoid the size estimation steps required by our existing spillable 
collections (since we’ll already know the exact size of our serialized data 
pages).
    - [SPARK-7271](https://issues.apache.org/jira/browse/SPARK-7271) Redesign / 
extend the shuffle interfaces to accept binary data as input. The goal here is 
to let us bypass serialization steps in cases where the sort input is produced 
by an operator that operates directly on binary data.
    - Extension / redesign of the `Serializer` API. We can add new methods 
which allow serializers to determine the size requirements for serializing 
objects and for serializing objects directly to a specified memory address 
(similar to how `UnsafeRowConverter` works in Spark SQL).
      
    **TODOs for this patch**:
    
    - More test cases:
      - [ ] Fuzz test for verifying that `Serializer` implementations properly 
obey the `supportsRelocationOfSerializedObjects` contract.
      - [ ] Test for sorter where all prefixes match.
      - [ ] Property-based / fuzz tests for sorter.
      - [ ] Test that checks that counters like `numRecordsWritten`, etc., are 
properly updated.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark unsafe-sort

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5868.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5868
    
----
commit 81d52c558d88f64a32ba73719da352c2365a56eb
Author: Josh Rosen <[email protected]>
Date:   2015-04-29T20:29:29Z

    WIP on UnsafeSorter

commit abf7bfe4ddbb2603272ef3926776ceefcc07ff7f
Author: Josh Rosen <[email protected]>
Date:   2015-04-29T21:34:15Z

    Add basic test case.

commit 57a4ea08c2f415f1fd63167b090c8567ef91ec2e
Author: Josh Rosen <[email protected]>
Date:   2015-04-30T04:16:17Z

    Make initialSize configurable in UnsafeSorter

commit e90015245c328755d26d185d6c13f15c1c7f30f7
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T00:02:18Z

    Add test for empty iterator in UnsafeSorter

commit 767d3cad606b47dc508a189642c12eff51b29682
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T01:47:28Z

    Fix invalid range in UnsafeSorter.
    
    TODO: write fuzz tests to uncover stuff like this.
    Sorting has nice invariants; should be an easy test
    to write.

commit 3db12de8ad6b9c724d952715aa5ad38a74cb2eda
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T01:49:40Z

    Minor simplification and sanity checks in UnsafeSorter

commit 4d2f5e1eb5af05e1d2e13e226192818d54ed7221
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T03:11:43Z

    WIP

commit 8e3ec208be6cdf4eb167bdbe6940ef5552aeb58a
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T20:31:28Z

    Begin code cleanup.

commit 253f13ee0796aa724decd075d159b81eda459daf
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T20:55:35Z

    More cleanup

commit 9c6cf58e1569dcf2a193b4af2e822a649a9d7775
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T21:11:32Z

    Refactor to use DiskBlockObjectWriter.

commit e267cee305a275e7e50fe13a08f5f22b2f9d5939
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T21:13:21Z

    Fix compilation of UnsafeSorterSuite

commit e2d96ca59b74c2aa004c471b651c7de2acaca51f
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T21:43:56Z

    Expand serializer API and use new function to help control when new 
UnsafeShuffle path is used.

commit d3cc310de0e35057d316d201ed7ee5498b1eca9c
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T21:47:33Z

    Flag that SparkSqlSerializer2 supports relocation

commit 87e721b7501ba6f96db919384b52e90c7a8c8d91
Author: Josh Rosen <[email protected]>
Date:   2015-05-01T23:34:47Z

    Renaming and comments

commit 07484589af43de03343fe58601793f6aaff33d56
Author: Josh Rosen <[email protected]>
Date:   2015-05-02T00:50:48Z

    Port UnsafeShuffleWriter to Java.

commit 026b4977a465b4c47af43e6365de4158d3d10ab7
Author: Josh Rosen <[email protected]>
Date:   2015-05-02T05:55:39Z

    Re-use a buffer in UnsafeShuffleWriter

commit 1433b42961ff7bf777b5a966f634822144d13f7c
Author: Josh Rosen <[email protected]>
Date:   2015-05-02T07:12:44Z

    Store record length as int instead of long.

commit 240864c9f860d41c9d2ad51ff78c9160e6e90992
Author: Josh Rosen <[email protected]>
Date:   2015-05-02T07:20:40Z

    Remove PrefixComputer and require prefix to be specified as part of insert()

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to