GitHub user mccheah opened a pull request:

    https://github.com/apache/spark/pull/8438

    [SPARK-10250][CORE] External group by to handle huge keys

    This takes the Python implementation of group-by-key and brings Scala up to 
parity, making Scala's group-by-key fault-tolerant to a single large group. It 
does so via using an ExternalList data structure to combine the groups, where 
ExternalList can spill if it gets too large.
    
    First, the performance testing in cases where a single group would be too 
big to fit in memory:
    
    I tried a few versions of this. In my first implementation, I wrote the 
ExternalList class, and simply replaced the CompactBuffer usage in 
PairRDDFunctions.groupByKey with external lists. I then ran the 
ExternalListSuite that did the group by key operation with some parameters 
modified. With 7200000 unique integers, bucketed into 5 buckets, the trials 
yielded:
    
    89.333 ms
    89.992 ms
    104.026 ms
    
    I then switched to the current implementation. It matches exactly what 
Python does with an ExternalSorter. So in essence, this is a sort-based group 
by. It wasn't clear to me why sort-based group by is better... until I saw the 
numbers, for the same RDD and buckets:
    
    49632 ms
    53615 ms
    54340 ms
    
    Therefore I went with the current implementation. It's not immediately 
clear to me however how the Python implementation - and this implementation for 
that matter - gets the specific speedup in using ExternalSorter. It would be 
great to get some feedback around why that is the case.
    
    Some caveats / things to note that I am concerned about:
    
    1. Serialization is a bit hacky in ExternalList - it took a bit of work to 
get ExternalList to be both serializable and yet still be able to be spillable. 
For example, I use an extra companion object to hold references to "constants" 
that I don't want to be wiped out upon serialization. Due to the way Java 
serialization works, the code blocks in a Scala class will not be invoked 
again, leaving all vals as null. The time where the val makes sense to be 
instantiated only upon creating the class, I either have to make it a var and 
instantiate it in the serialization process, or mark the val as lazy so it is 
re-initialized upon use post-serialization.
    
    2. ExternalList's spilling behavior is tied to shuffle parameters, which is 
a bit unintuitive. This is what Python does as well. I guess the alternative 
would be other Spark configurations, or parameters to group-by-key that have 
reasonable defaults. Pretty ugly either way. This should theoretically not 
cause a significant performance regression in existing groupByKey workflows 
however - existing workflows would have just had the big groups spilled to disk 
after writing the whole group to the ExternalSorter anyways, right? I haven't 
been able to benchmark test the cases where regular groupByKey wouldn't spill 
normally but would spill now, however.
    
    3. I'm open to opinions on the cleanup logic. I register external lists 
with weak reference cleaners such that when the lists are GCed, the underlying 
files should be cleaned up shortly after. If there's a better way to do this, 
I'm all ears.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/palantir/spark feature/external-group-by-wip

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8438.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8438
    
----
commit e6f9191a4eb2fc444ea6a908ffc49959583db9e4
Author: mcheah <[email protected]>
Date:   2015-07-16T23:21:05Z

    [WIP] External Group By initial impl

commit 3a1624e1eeba45df0c1a807b0dbeea946b021f9c
Author: mcheah <[email protected]>
Date:   2015-07-17T01:14:50Z

    Fix java serialization for ExternalList.

commit 8b3a51d432c70eafb290a67cb5c52e1126322229
Author: mcheah <[email protected]>
Date:   2015-07-17T01:17:45Z

    Remove confusing comment

commit a93445e5f9bd81ba1c04590e5ecf5307a4b22dfc
Author: mcheah <[email protected]>
Date:   2015-07-30T16:39:13Z

    Refactor logic common to both ExternalAppendOnlyMap and ExternalList

commit 86d42c4cdff36396131e09905a6d784385ea064c
Author: mcheah <[email protected]>
Date:   2015-07-30T20:52:59Z

    Added a formal group by unit test for spilling.
    
    Also organized imports, and fixed a bug where the external list cannot
    be iterated through twice because the file is cleaned up.

commit 09f5d2230d180bcfa2770a53833be1c9c01ce9a1
Author: mcheah <[email protected]>
Date:   2015-07-30T21:50:41Z

    Merge remote-tracking branch 'origin/master' into external-group-by
    
    Conflicts:
        core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
        
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

commit d891b75d25e152b2304548944c5e642329a148cb
Author: mcheah <[email protected]>
Date:   2015-07-30T21:57:16Z

    Fix merge conflict compiler errors

commit f06b5def7294de39c8a76d910cc900f5ee0c1864
Author: mcheah <[email protected]>
Date:   2015-07-30T23:57:13Z

    Removing unnecessary check

commit 0ec234c2a6ffb5d87fb073fa7d245fce49d466d2
Author: mcheah <[email protected]>
Date:   2015-07-31T00:31:43Z

    Fixing some typos and unnecessary comments
    
    Removing original implementation that was commented out

commit e91df52bc811e1ca63f9a9b4beff773ec2c83566
Author: mcheah <[email protected]>
Date:   2015-07-31T19:53:36Z

    Clean up ExternalLists more eagerly.
    
    Uses a Cleaner thread similar to ContextCleaner that can also be
    running on the executors. Uses WeakReference to determine if a list
    can be cleaned up or not. ExternalList objects register themselves
    for cleanup upon construction or deserialization.

commit ecd4163062314b2e48e17aeba03feeea4e7ee6d4
Author: mcheah <[email protected]>
Date:   2015-08-04T22:18:13Z

    Adding a comment

commit 17c3f3db1d6761c49a5f2b6eb6163aa35abdc119
Author: mcheah <[email protected]>
Date:   2015-08-17T22:16:23Z

    Using ExternalList[_] in KryoSerializer. Clean up SpillableCollection.next

commit 3d066fc9dd36893b09cd9733ef29aa3076ab6626
Author: mcheah <[email protected]>
Date:   2015-08-18T00:44:39Z

    Fixing unit test

commit 083f9e2385a93909890ece739a4e59b099c3898d
Author: mcheah <[email protected]>
Date:   2015-08-25T21:47:17Z

    Merge branch 'master' into external-group-by
    
    Conflicts:
        core/src/main/scala/org/apache/spark/ContextCleaner.scala
        
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

commit 4c051103b42fa6bb6904f5940aa4679c763cfad8
Author: mcheah <[email protected]>
Date:   2015-08-25T22:04:02Z

    Fix a whole ton of Scalastyle errors

commit 8f5d5e38325bf3cec03e8d109af492a65e327a90
Author: mcheah <[email protected]>
Date:   2015-08-25T23:03:50Z

    Continuing to sanitize unit tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to