GitHub user JoshRosen opened a pull request:

    https://github.com/apache/spark/pull/5725

    [SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for 
aggregations

    This patch adds managed-memory-based aggregation to Spark SQL / DataFrames. 
Instead of working with Java objects, this new aggregation path uses 
`sun.misc.Unsafe` to manipulate raw memory.  This reduces the memory footprint 
for aggregations, resulting in fewer spills, OutOfMemoryErrors, and garbage 
collection pauses.  As a result, this allows for higher memory utilization.  It 
can also result in better cache locality since objects will be stored closer 
together in memory.
    
    This feature can be eanbled by setting `spark.sql.unsafe.enabled=true`.  
For now, this feature is only supported when codegen is enabled and only 
supports aggregations for which the grouping columns are primitive numeric 
types or strings and aggregated values are numeric.
    
    ### Managing memory with sun.misc.Unsafe
    
    This patch supports both on- and off-heap managed memory.
    
    - In on-heap mode, memory addresses are identified by the combination of a 
base Object and an offset within that object.
    - In off-heap mode, memory is addressed directly with 64-bit long addresses.
    
    To support both modes, functions that manipulate memory accept both 
`baseObject` and `baseOffset` fields.  In off-heap mode, we simply pass `null` 
as `baseObject`.
    
    We allocate memory in large chunks, so memory fragmentation and allocation 
speed are not significant bottlenecks.
    
    By default, we use on-heap mode.  To enable off-heap mode, set 
`spark.sql.unsafe.offHeap=true`.
    
    ### Compact tuple format
    
    This patch introduces `UnsafeRow`, a compact row layout.  In this format, 
each tuple has three parts: a null bit set, fixed length values, and 
variable-length values:
    
    
![image](https://cloud.githubusercontent.com/assets/50748/7328538/2fdb65ce-ea8b-11e4-9743-6c0f02bb7d1f.png)
    
    - Rows are always 8-byte word aligned (so their sizes will always be a 
multiple of 8 bytes)
    - The bit set is used for null tracking:
        - Position _i_ is set if and only if field _i_ is null
        - The bit set is aligned to an 8-byte word boundary.
    - Every field appears as an 8-byte word in the fixed-length values part:
        - If a field is null, we zero out the values.
        - If a field is variable-length, the word stores a relative offset 
(w.r.t. the base of the tuple) that points to the beginning of the field's data 
in the variable-length part.
    - Each variable-length data type can have its own encoding:
        - For strings, the first word stores the length of the string and is 
followed by UTF-8 encoded bytes.  If necessary, the end of the string is padded 
with empty bytes in order to ensure word-alignment.
    
    For example, a tuple that consists 3 fields of type (int, string, string), 
with value (null, “data”, “bricks”) would look like this:
    
    
![image](https://cloud.githubusercontent.com/assets/50748/7328526/1e21959c-ea8b-11e4-9a28-a4350fe4a7b5.png)
    
    This format allows us to compare tuples for equality by directly comparing 
their raw bytes.  This also enables fast hashing of tuples.
    
    ### Hash map for performing aggregations
    
    This patch introduces `UnsafeFixedWidthAggregationMap`, a hash map for 
performing aggregations where the aggregation result columns are fixed-with.  
This map's keys and values are `Row` objects. `UnsafeFixedWidthAggregationMap` 
is implemented on top of `BytesToBytesMap`, an append-only map which supports 
byte-array keys and values.
    
    `BytesToBytesMap` stores pointers to key and value tuples.  For each record 
with a new key, we copy the key and create the aggregation value buffer for 
that key and put them in a buffer. The hash table then simply stores pointers 
to the key and value. For each record with an existing key, we simply run the 
aggregation function to update the values in place.
    
    This map is implemented using open hashing with triangular sequence 
probing.  Each entry stores two words in a long array: the first word stores 
the address of the key and the second word stores the relative offset from the 
key tuple to the value tuple, as well as the key's 32-bit hashcode.  By storing 
the full hashcode, we reduce the number of equality checks that need to be 
performed to handle position collisions ()since the chance of hashcode 
collision is much lower than position collision).
    
    `UnsafeFixedWidthAggregationMap` allows regular Spark SQL `Row` objects to 
be used when probing the map.  Internally, it encodes these rows into 
`UnsafeRow` format using `UnsafeRowConverter`.  This conversion has a small 
overhead that can be eliminated in the future once we use UnsafeRows in other 
operators.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/JoshRosen/spark unsafe

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/5725.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5725
    
----
commit 480a74a12b9a3e3d71c1b65dcc41c9111ed33958
Author: Josh Rosen <[email protected]>
Date:   2015-04-17T22:28:50Z

    Initial import of code from Databricks unsafe utils repo.

commit ab68e081eef12333ff6c475cd70759ab7c6aeb74
Author: Josh Rosen <[email protected]>
Date:   2015-04-18T00:38:51Z

    Begin merging the UTF8String implementations.

commit f03e9c17e2458c1f081196c27ac44abe70d5be36
Author: Josh Rosen <[email protected]>
Date:   2015-04-18T04:58:58Z

    Play around with Unsafe implementations of more string methods.

commit 5d55cef9edcedae114d095ab656512fd4b3946ac
Author: Josh Rosen <[email protected]>
Date:   2015-04-18T05:22:48Z

    Add skeleton for Row implementation.

commit 8a8f9df5af0adac1ceb078443c6dade3a76d8fce
Author: Josh Rosen <[email protected]>
Date:   2015-04-18T05:59:02Z

    Add skeleton for GeneratedAggregate integration.
    
    This typechecks properly and sketches how I'm intending to use row pointers
    and the hashmap. This has been a useful exercise for figuring out whether my
    interfaces will be sufficient.

commit 1ff814de9f7c4f1716e09d75f06fa261822784ee
Author: Josh Rosen <[email protected]>
Date:   2015-04-18T06:11:35Z

    Add reminder to free memory on iterator completion

commit 53ba9b79e12a58f5c4ee217e434fbc20195ffc62
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T05:11:37Z

    Start prototyping Java Row -> UnsafeRow converters

commit fc4c3a8aa5b345526298379124530b6c2793d9e5
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T05:26:51Z

    Sketch how the converters will be used in UnsafeGeneratedAggregate

commit 1a483c5a7303d4267e5a2adb10fa23c672224361
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T07:50:07Z

    First version that passes some aggregation tests:
    
    I commented out a number of tests where we do not support the required
    data types; this is only a short-term hack until I extend the planner
    to understand when UnsafeGeneratedAggregate can be used.

commit 079f1bf3b8d0b72eae5882d1c1ae69db6d21c7cd
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T19:54:10Z

    Some clarification of the BytesToBytesMap.lookup() / set() contract.

commit f764d1324ee0aa327217c3bf98868f06c0ae7fbf
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T20:22:15Z

    Simplify address + length calculation in Location.

commit c754ae142933a901738be00ae865b96f6ca47f1f
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T22:56:01Z

    Now that the store*() contract has been stregthened, we can remove an extra 
lookup

commit ae39694e722753da288c66455267a3acfca09187
Author: Josh Rosen <[email protected]>
Date:   2015-04-19T23:47:34Z

    Add finalizer as "cleanup method of last resort"

commit c7f0b563168048c0ec046a45d3cad6b81491d4b1
Author: Josh Rosen <[email protected]>
Date:   2015-04-20T00:06:37Z

    Reuse UnsafeRow pointer in UnsafeRowConverter

commit 62ab054db492fd77289150edd3705539f5848a39
Author: Josh Rosen <[email protected]>
Date:   2015-04-20T00:39:33Z

    Optimize for fact that get() is only called on String columns.

commit c55bf668efe9494caca1f7952c37b34035341cea
Author: Josh Rosen <[email protected]>
Date:   2015-04-20T00:53:49Z

    Free buffer once iterator has been fully consumed.

commit 738fa3392375759078ab6a8c677af712257875b9
Author: Josh Rosen <[email protected]>
Date:   2015-04-20T22:09:51Z

    Add feature flag to guard UnsafeGeneratedAggregate

commit c1b3813bcaa6be5fde7ac18bceaaa307e2203596
Author: Josh Rosen <[email protected]>
Date:   2015-04-20T23:53:49Z

    Fix bug in UnsafeMemoryAllocator.free():
    
    The `if` check here was backwards, which prevented any memory from
    being freed.

commit 7df600872f1a8e61b4544464f412b940501fb487
Author: Josh Rosen <[email protected]>
Date:   2015-04-21T00:32:21Z

    Optimizations related to zeroing out memory:
    
    - Do not zero out all allocated memory; the zeroing isn't free and in many
      cases it isn't necessary.
    - There are some cases where we do want to clear the memory, such as in 
BitSet.
    
      It shouldn't be the BitSet object's responsibility to zero out the memory
      block passed to it (since maybe we're passing some memory created by 
someone
      else and want to interpret it as a bitset). To make the caller's life 
easier,
      though, I added a MemoryBlock.zero() method for clearing the block.
    
    - In UnsafeGeneratedAggregate, use Arrays.fill to clear the re-used 
temporary
      row buffer, since this is likely to be much faster than Unsafe.setMemory;
      see 
http://psy-lob-saw.blogspot.com/2015/04/on-arraysfill-intrinsics-superword-and.html
      for more details.

commit 58ac3938e892a092dafba80b870d21e23698f111
Author: Josh Rosen <[email protected]>
Date:   2015-04-21T00:36:58Z

    Use UNSAFE allocator in GeneratedAggregate (TODO: make this configurable)

commit d2bb986fce7fedaccd7875452126481a068ebace
Author: Josh Rosen <[email protected]>
Date:   2015-04-22T03:24:03Z

    Update to implement new Row methods added upstream

commit b3eaccde0f00453d01be551cb7819c3a92a4f65c
Author: Josh Rosen <[email protected]>
Date:   2015-04-22T04:21:20Z

    Extract aggregation map into its own class.
    
    This makes the code much easier to understand and
    will allow me to implement unsafe versions of both
    GeneratedAggregate and the regular Aggregate operator.

commit bade9665d165a75bd7915b9a9a6cb196a77e73e8
Author: Josh Rosen <[email protected]>
Date:   2015-04-22T04:23:34Z

    Comment update (bumping to refresh GitHub cache...)

commit d85eeff90040a6c4de3c7607cf75d9e7b23f04bf
Author: Josh Rosen <[email protected]>
Date:   2015-04-22T06:05:47Z

    Add basic sanity test for UnsafeFixedWidthAggregationMap

commit 1f4b7166afeaef2bf44fc6302499f5264c0df596
Author: Josh Rosen <[email protected]>
Date:   2015-04-22T21:40:59Z

    Merge Unsafe code into the regular GeneratedAggregate, guarded by a
    configuration flag; integrate planner support and re-enable all tests.

commit 92d5a06b181b56baed23fea22f58cb82d8f83d30
Author: Josh Rosen <[email protected]>
Date:   2015-04-23T19:10:11Z

    Address a number of minor code review comments.

commit 628f9366bac4687dd393f336d7d23eea1e17364b
Author: Josh Rosen <[email protected]>
Date:   2015-04-23T20:25:32Z

    Use ints intead of longs for indexing.

commit 23a440ac3636012595359d2d098c6da27e07fa5b
Author: Josh Rosen <[email protected]>
Date:   2015-04-23T20:29:06Z

    Bump up default hash map size

commit 765243d387667ca3470259769ad57f71162e09e2
Author: Josh Rosen <[email protected]>
Date:   2015-04-23T22:15:37Z

    Enable optional performance metrics for hash map.

commit b26f1d374672b0ff5657fe89463095e90d246269
Author: Josh Rosen <[email protected]>
Date:   2015-04-23T23:49:53Z

    Fix bug in murmur hash implementation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to