Github user shivaram commented on a diff in the pull request:
https://github.com/apache/spark/pull/2030#discussion_r16399782
--- Diff:
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -109,99 +137,30 @@ private[spark] class TorrentBroadcast[T: ClassTag](
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
- SparkEnv.get.blockManager.getSingle(broadcastId) match {
+ SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next())
match {
case Some(x) =>
- value_ = x.asInstanceOf[T]
+ _value = x.asInstanceOf[T]
case None =>
- val start = System.nanoTime
logInfo("Started reading broadcast variable " + id)
-
- // Initialize @transient variables that will receive garbage
values from the master.
- resetWorkerVariables()
-
- if (receiveBroadcast()) {
- value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks,
totalBytes, totalBlocks)
-
- /* Store the merged copy in cache so that the next worker
doesn't need to rebuild it.
- * This creates a trade-off between memory usage and latency.
Storing copy doubles
- * the memory footprint; not storing doubles deserialization
cost. Also,
- * this does not need to be reported to BlockManagerMaster
since other executors
- * does not need to access this block (they only need to fetch
the chunks,
- * which are reported).
- */
- SparkEnv.get.blockManager.putSingle(
- broadcastId, value_, StorageLevel.MEMORY_AND_DISK,
tellMaster = false)
-
- // Remove arrayOfBlocks from memory once value_ is on local
cache
- resetWorkerVariables()
- } else {
- logError("Reading broadcast variable " + id + " failed")
- }
-
- val time = (System.nanoTime - start) / 1e9
+ val start = System.nanoTime()
+ val blocks = readBlocks()
+ val time = (System.nanoTime() - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + "
s")
- }
- }
- }
-
- private def resetWorkerVariables() {
- arrayOfBlocks = null
- totalBytes = -1
- totalBlocks = -1
- hasBlocks = 0
- }
-
- private def receiveBroadcast(): Boolean = {
- // Receive meta-info about the size of broadcast data,
- // the number of chunks it is divided into, etc.
- val metaId = BroadcastBlockId(id, "meta")
- var attemptId = 10
- while (attemptId > 0 && totalBlocks == -1) {
- SparkEnv.get.blockManager.getSingle(metaId) match {
- case Some(x) =>
- val tInfo = x.asInstanceOf[TorrentInfo]
- totalBlocks = tInfo.totalBlocks
- totalBytes = tInfo.totalBytes
- arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
- hasBlocks = 0
-
- case None =>
- Thread.sleep(500)
- }
- attemptId -= 1
- }
-
- if (totalBlocks == -1) {
- return false
- }
- /*
- * Fetch actual chunks of data. Note that all these chunks are stored
in
- * the BlockManager and reported to the master, so that other executors
- * can find out and pull the chunks from this executor.
- */
- val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ +
1).toList)
- for (pid <- recvOrder) {
- val pieceId = BroadcastBlockId(id, "piece" + pid)
- SparkEnv.get.blockManager.getSingle(pieceId) match {
- case Some(x) =>
- arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
- hasBlocks += 1
+ _value = TorrentBroadcast.unBlockifyObject[T](blocks)
+ // Store the merged copy in BlockManager so other tasks on this
executor doesn't
--- End diff --
nit: doesn't -> don't
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]