GitHub user rxin opened a pull request:
https://github.com/apache/spark/pull/1648
[SPARK-2585] Remove special handling of Hadoop JobConf.
This is based on #1498. Diff here:
https://github.com/rxin/spark/commit/5904cb6649b03d48e3465ccab664b506cc69327b
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rxin/spark jobconf
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1648.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1648
----
commit cae0af33b535a7772fd2861851dca056e0c2186c
Author: Reynold Xin <[email protected]>
Date: 2014-07-19T06:52:47Z
[SPARK-2521] Broadcast RDD object (instead of sending it along with every
task).
Currently (as of Spark 1.0.1), Spark sends RDD object (which contains
closures) using Akka along with the task itself to the executors. This is
inefficient because all tasks in the same stage use the same RDD object, but we
have to send RDD object multiple times to the executors. This is especially bad
when a closure references some variable that is very large. The current design
led to users having to explicitly broadcast large variables.
The patch uses broadcast to send RDD objects and the closures to executors,
and use Akka to only send a reference to the broadcast RDD/closure along with
the partition specific information for the task. For those of you who know more
about the internals, Spark already relies on broadcast to send the Hadoop
JobConf every time it uses the Hadoop input, because the JobConf is large.
The user-facing impact of the change include:
1. Users won't need to decide what to broadcast anymore, unless they would
want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher
task dispatch throughput.
In addition, the change will simplify some internals of Spark, eliminating
the need to maintain task caches and the complex logic to broadcast JobConf
(which also led to a deadlock recently).
A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x => a; x }.groupBy { x => a; x
}.count
```
Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```
Author: Reynold Xin <[email protected]>
Closes #1452 from rxin/broadcast-task and squashes the following commits:
762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet
(instead of sending it for every task).
(cherry picked from commit 7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2)
Signed-off-by: Reynold Xin <[email protected]>
commit d256b456b8450706ecacd98033c3f4d40b37814c
Author: Reynold Xin <[email protected]>
Date: 2014-07-20T07:00:12Z
Fixed unit test failures. One more to go.
commit cc152fcd14bb13104f391da0fb703a1d2203e3a6
Author: Reynold Xin <[email protected]>
Date: 2014-07-21T01:48:18Z
Don't cache the RDD broadcast variable.
commit de779f8704a7f586190dc0e25642836e06136cbb
Author: Reynold Xin <[email protected]>
Date: 2014-07-21T07:21:13Z
Fix TaskContextSuite.
commit 991c002fc4238308108c07fb40b3400a3d448e2f
Author: Reynold Xin <[email protected]>
Date: 2014-07-23T05:41:53Z
Use HttpBroadcast.
commit cf384501c1874284e8412439466eb6e22d5fe6d6
Author: Reynold Xin <[email protected]>
Date: 2014-07-25T07:10:18Z
Use TorrentBroadcastFactory.
commit bab1d8b601d88b946e3770c611c33d2040472492
Author: Reynold Xin <[email protected]>
Date: 2014-07-28T22:38:57Z
Check for NotSerializableException in submitMissingTasks.
commit 797c247ba12dd3eeaa5dda621b9db5a8419732f0
Author: Reynold Xin <[email protected]>
Date: 2014-07-29T01:20:13Z
Properly send SparkListenerStageSubmitted and SparkListenerStageCompleted.
commit 111007d719e9c23e5baf4fc3dc374d01115c0e1f
Author: Reynold Xin <[email protected]>
Date: 2014-07-29T01:29:33Z
Fix broadcast tests.
commit 252238da16fe3a3dfd4a067ca4a9ac47d4fac025
Author: Reynold Xin <[email protected]>
Date: 2014-07-29T04:31:20Z
Serialize the final task closure as well as ShuffleDependency in taskBinary.
commit f8535dc959b6d3b733fd46adbfa07708557a1d05
Author: Reynold Xin <[email protected]>
Date: 2014-07-29T04:35:23Z
Fixed the style violation.
commit 5904cb6649b03d48e3465ccab664b506cc69327b
Author: Reynold Xin <[email protected]>
Date: 2014-07-30T04:39:14Z
[SPARK-2585] Remove special handling of Hadoop JobConf.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---