GitHub user mccheah opened a pull request:
https://github.com/apache/spark/pull/8438
[SPARK-10250][CORE] External group by to handle huge keys
This takes the Python implementation of group-by-key and brings Scala up to
parity, making Scala's group-by-key fault-tolerant to a single large group. It
does so via using an ExternalList data structure to combine the groups, where
ExternalList can spill if it gets too large.
First, the performance testing in cases where a single group would be too
big to fit in memory:
I tried a few versions of this. In my first implementation, I wrote the
ExternalList class, and simply replaced the CompactBuffer usage in
PairRDDFunctions.groupByKey with external lists. I then ran the
ExternalListSuite that did the group by key operation with some parameters
modified. With 7200000 unique integers, bucketed into 5 buckets, the trials
yielded:
89.333 ms
89.992 ms
104.026 ms
I then switched to the current implementation. It matches exactly what
Python does with an ExternalSorter. So in essence, this is a sort-based group
by. It wasn't clear to me why sort-based group by is better... until I saw the
numbers, for the same RDD and buckets:
49632 ms
53615 ms
54340 ms
Therefore I went with the current implementation. It's not immediately
clear to me however how the Python implementation - and this implementation for
that matter - gets the specific speedup in using ExternalSorter. It would be
great to get some feedback around why that is the case.
Some caveats / things to note that I am concerned about:
1. Serialization is a bit hacky in ExternalList - it took a bit of work to
get ExternalList to be both serializable and yet still be able to be spillable.
For example, I use an extra companion object to hold references to "constants"
that I don't want to be wiped out upon serialization. Due to the way Java
serialization works, the code blocks in a Scala class will not be invoked
again, leaving all vals as null. The time where the val makes sense to be
instantiated only upon creating the class, I either have to make it a var and
instantiate it in the serialization process, or mark the val as lazy so it is
re-initialized upon use post-serialization.
2. ExternalList's spilling behavior is tied to shuffle parameters, which is
a bit unintuitive. This is what Python does as well. I guess the alternative
would be other Spark configurations, or parameters to group-by-key that have
reasonable defaults. Pretty ugly either way. This should theoretically not
cause a significant performance regression in existing groupByKey workflows
however - existing workflows would have just had the big groups spilled to disk
after writing the whole group to the ExternalSorter anyways, right? I haven't
been able to benchmark test the cases where regular groupByKey wouldn't spill
normally but would spill now, however.
3. I'm open to opinions on the cleanup logic. I register external lists
with weak reference cleaners such that when the lists are GCed, the underlying
files should be cleaned up shortly after. If there's a better way to do this,
I'm all ears.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/palantir/spark feature/external-group-by-wip
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/8438.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #8438
----
commit e6f9191a4eb2fc444ea6a908ffc49959583db9e4
Author: mcheah <[email protected]>
Date: 2015-07-16T23:21:05Z
[WIP] External Group By initial impl
commit 3a1624e1eeba45df0c1a807b0dbeea946b021f9c
Author: mcheah <[email protected]>
Date: 2015-07-17T01:14:50Z
Fix java serialization for ExternalList.
commit 8b3a51d432c70eafb290a67cb5c52e1126322229
Author: mcheah <[email protected]>
Date: 2015-07-17T01:17:45Z
Remove confusing comment
commit a93445e5f9bd81ba1c04590e5ecf5307a4b22dfc
Author: mcheah <[email protected]>
Date: 2015-07-30T16:39:13Z
Refactor logic common to both ExternalAppendOnlyMap and ExternalList
commit 86d42c4cdff36396131e09905a6d784385ea064c
Author: mcheah <[email protected]>
Date: 2015-07-30T20:52:59Z
Added a formal group by unit test for spilling.
Also organized imports, and fixed a bug where the external list cannot
be iterated through twice because the file is cleaned up.
commit 09f5d2230d180bcfa2770a53833be1c9c01ce9a1
Author: mcheah <[email protected]>
Date: 2015-07-30T21:50:41Z
Merge remote-tracking branch 'origin/master' into external-group-by
Conflicts:
core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
commit d891b75d25e152b2304548944c5e642329a148cb
Author: mcheah <[email protected]>
Date: 2015-07-30T21:57:16Z
Fix merge conflict compiler errors
commit f06b5def7294de39c8a76d910cc900f5ee0c1864
Author: mcheah <[email protected]>
Date: 2015-07-30T23:57:13Z
Removing unnecessary check
commit 0ec234c2a6ffb5d87fb073fa7d245fce49d466d2
Author: mcheah <[email protected]>
Date: 2015-07-31T00:31:43Z
Fixing some typos and unnecessary comments
Removing original implementation that was commented out
commit e91df52bc811e1ca63f9a9b4beff773ec2c83566
Author: mcheah <[email protected]>
Date: 2015-07-31T19:53:36Z
Clean up ExternalLists more eagerly.
Uses a Cleaner thread similar to ContextCleaner that can also be
running on the executors. Uses WeakReference to determine if a list
can be cleaned up or not. ExternalList objects register themselves
for cleanup upon construction or deserialization.
commit ecd4163062314b2e48e17aeba03feeea4e7ee6d4
Author: mcheah <[email protected]>
Date: 2015-08-04T22:18:13Z
Adding a comment
commit 17c3f3db1d6761c49a5f2b6eb6163aa35abdc119
Author: mcheah <[email protected]>
Date: 2015-08-17T22:16:23Z
Using ExternalList[_] in KryoSerializer. Clean up SpillableCollection.next
commit 3d066fc9dd36893b09cd9733ef29aa3076ab6626
Author: mcheah <[email protected]>
Date: 2015-08-18T00:44:39Z
Fixing unit test
commit 083f9e2385a93909890ece739a4e59b099c3898d
Author: mcheah <[email protected]>
Date: 2015-08-25T21:47:17Z
Merge branch 'master' into external-group-by
Conflicts:
core/src/main/scala/org/apache/spark/ContextCleaner.scala
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
commit 4c051103b42fa6bb6904f5940aa4679c763cfad8
Author: mcheah <[email protected]>
Date: 2015-08-25T22:04:02Z
Fix a whole ton of Scalastyle errors
commit 8f5d5e38325bf3cec03e8d109af492a65e327a90
Author: mcheah <[email protected]>
Date: 2015-08-25T23:03:50Z
Continuing to sanitize unit tests
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]