GitHub user kayousterhout opened a pull request:
https://github.com/apache/spark/pull/4145
[SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD
CoGroupPartition, part of CoGroupedRDD, includes references to each RDD
that the CoGroupedRDD narrowly depends on, and a reference to the
ShuffleHandle. The partition is serialized separately from the RDD, so when the
RDD and partition arrive on the worker, the references in the partition and in
the RDD no longer point to the same object.
This is a relatively minor performance issue (the closure can be 2x larger
than it needs to be because the rdds and partitions are serialized twice; see
numbers below) but is more annoying as a developer issue (this is where I ran
into): if any state is stored in the RDD or ShuffleHandle on the worker side,
subtle bugs can appear due to the fact that the references to the RDD /
ShuffleHandle in the RDD and in the partition point to separate objects. I'm
not sure if this is enough of a potential future problem to fix this old and
central part of the code, so hoping to get input from others here.
I did some simple experiments to see how much this effects closure size.
For this example:
$ val a = sc.parallelize(1 to 10).map((_, 1))
$ val b = sc.parallelize(1 to 2).map(x => (x, 2*x))
$ a.cogroup(b).collect()
the closure was 1902 bytes with current Spark, and 1129 bytes after my
change. The difference comes from eliminating duplicate serialization of the
shuffle handle.
For this example:
$ val sortedA = a.sortByKey()
$ val sortedB = b.sortByKey()
$ sortedA.cogroup(sortedB).collect()
the closure was 3491 bytes with current Spark, and 1333 bytes after my
change. Here, the difference comes from eliminating duplicate serialization of
the two RDDs for the narrow dependencies.
The ShuffleHandle includes the ShuffleDependency, so this difference will
get larger if a ShuffleDependency includes a serializer, a key ordering, or an
aggregator (all set to None by default). It would also get bigger for a big RDD
-- although I can't think of any examples where the RDD object gets large. The
difference is not affected by the size of the function the user specifies,
which (based on my understanding) is typically the source of large task
closures.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kayousterhout/spark-1 SPARK-5360
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/4145.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4145
----
commit 912d48ddf840f77c98674dc7ab034a289b68ce03
Author: Kay Ousterhout <[email protected]>
Date: 2015-01-21T21:54:56Z
[SPARK-5360] Eliminate duplicate objects in serialized CoGroupedRDD
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]