GitHub user rxin opened a pull request:
https://github.com/apache/spark/pull/1498
[SPARK-2521] Broadcast RDD object (instead of sending it along with every
task)
This is a resubmission of #1452. It was reverted because it broke the build.
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains
closures) using Akka along with the task itself to the executors. This is
inefficient because all tasks in the same stage use the same RDD object, but we
have to send RDD object multiple times to the executors. This is especially bad
when a closure references some variable that is very large. The current design
led to users having to explicitly broadcast large variables.
The patch uses broadcast to send RDD objects and the closures to executors,
and use Akka to only send a reference to the broadcast RDD/closure along with
the partition specific information for the task. For those of you who know more
about the internals, Spark already relies on broadcast to send the Hadoop
JobConf every time it uses the Hadoop input, because the JobConf is large.
The user-facing impact of the change include:
1. Users won't need to decide what to broadcast anymore, unless they would
want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher
task dispatch throughput.
In addition, the change will simplify some internals of Spark, eliminating
the need to maintain task caches and the complex logic to broadcast JobConf
(which also led to a deadlock recently).
A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x
}.count
```
Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rxin/spark broadcast-task
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1498.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1498
----
commit 482e4f07dc4b8a08d74bc67336afd7b801687d78
Author: Reynold Xin <[email protected]>
Date: 2014-07-19T06:52:47Z
[SPARK-2521] Broadcast RDD object (instead of sending it along with every
task).
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains
closures) using Akka along with the task itself to the executors. This is
inefficient because all tasks in the same stage use the same RDD object, but we
have to send RDD object multiple times to the executors. This is especially bad
when a closure references some variable that is very large. The current design
led to users having to explicitly broadcast large variables.
The patch uses broadcast to send RDD objects and the closures to executors,
and use Akka to only send a reference to the broadcast RDD/closure along with
the partition specific information for the task. For those of you who know more
about the internals, Spark already relies on broadcast to send the Hadoop
JobConf every time it uses the Hadoop input, because the JobConf is large.
The user-facing impact of the change include:
1. Users won't need to decide what to broadcast anymore, unless they would
want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher
task dispatch throughput.
In addition, the change will simplify some internals of Spark, eliminating
the need to maintain task caches and the complex logic to broadcast JobConf
(which also led to a deadlock recently).
A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x
}.count
```
Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```
Author: Reynold Xin <[email protected]>
Closes #1452 from rxin/broadcast-task and squashes the following commits:
762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet
(instead of sending it for every task).
(cherry picked from commit 7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2)
Signed-off-by: Reynold Xin <[email protected]>
commit 3092e695dfd802f86392ca048927c9d2006759f5
Author: Reynold Xin <[email protected]>
Date: 2014-07-20T07:00:12Z
Fixed unit test failures. One more to go.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---