Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2030#discussion_r16399340
  
    --- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -27,41 +29,87 @@ import org.apache.spark.io.CompressionCodec
     import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
     
     /**
    - *  A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a 
BitTorrent-like
    - *  protocol to do a distributed transfer of the broadcasted data to the 
executors.
    - *  The mechanism is as follows. The driver divides the serializes the 
broadcasted data,
    - *  divides it into smaller chunks, and stores them in the BlockManager of 
the driver.
    - *  These chunks are reported to the BlockManagerMaster so that all the 
executors can
    - *  learn the location of those chunks. The first time the broadcast 
variable (sent as
    - *  part of task) is deserialized at a executor, all the chunks are 
fetched using
    - *  the BlockManager. When all the chunks are fetched (initially from the 
driver's
    - *  BlockManager), they are combined and deserialized to recreate the 
broadcasted data.
    - *  However, the chunks are also stored in the BlockManager and reported 
to the
    - *  BlockManagerMaster. As more executors fetch the chunks, 
BlockManagerMaster learns
    - *  multiple locations for each chunk. Hence, subsequent fetches of each 
chunk will be
    - *  made to other executors who already have those chunks, resulting in a 
distributed
    - *  fetching. This prevents the driver from being the bottleneck in 
sending out multiple
    - *  copies of the broadcast data (one per executor) as done by the
    - *  [[org.apache.spark.broadcast.HttpBroadcast]].
    + * A BitTorrent-like implementation of 
[[org.apache.spark.broadcast.Broadcast]].
    + *
    + * The mechanism is as follows:
    + *
    + * The driver divides the serialized object into small chunks and
    + * stores those chunks in the BlockManager of the driver.
    + *
    + * On each executor, the executor first attempts to fetch the object from 
its BlockManager. If
    + * it does not exist, it then uses remote fetches to fetch the small 
chunks from the driver and/or
    + * other executors if available. Once it gets the chunks, it puts the 
chunks in its own
    + * BlockManager, ready for other executors to fetch from.
    + *
    + * This prevents the driver from being the bottleneck in sending out 
multiple copies of the
    + * broadcast data (one per executor) as done by the 
[[org.apache.spark.broadcast.HttpBroadcast]].
    + *
    + * @param obj object to broadcast
    + * @param isLocal whether Spark is running in local mode (single JVM 
process).
    + * @param id A unique identifier for the broadcast variable.
      */
     private[spark] class TorrentBroadcast[T: ClassTag](
    -    @transient var value_ : T, isLocal: Boolean, id: Long)
    +    obj : T,
    +    @transient private val isLocal: Boolean,
    +    id: Long)
       extends Broadcast[T](id) with Logging with Serializable {
     
    -  override protected def getValue() = value_
    +  override protected def getValue() = _value
    +
    +  /**
    +   * Value of the broadcast object. On driver, this is set directly by the 
constructor.
    +   * On executors, this is reconstructed by [[readObject]], which builds 
this value by reading
    +   * blocks from the driver and/or other executors.
    +   */
    +  @transient private var _value: T = obj
    +
    +  /** Total number of blocks this broadcast variable contains. */
    +  private val numBlocks: Int = writeBlocks()
     
       private val broadcastId = BroadcastBlockId(id)
     
    -  SparkEnv.get.blockManager.putSingle(
    -    broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    +  /**
    +   * Divide the object into multiple blocks and put those blocks in the 
block manager.
    +   *
    +   * @return number of blocks this broadcast variable is divided into
    +   */
    +  private def writeBlocks(): Int = {
    +    val blocks = TorrentBroadcast.blockifyObject(_value)
    +    blocks.zipWithIndex.foreach { case (block, i) =>
    +      // TODO: Use putBytes directly.
    +      SparkEnv.get.blockManager.putSingle(
    +        BroadcastBlockId(id, "piece" + i),
    +        blocks(i),
    +        StorageLevel.MEMORY_AND_DISK_SER,
    +        tellMaster = true)
    +    }
    +    blocks.length
    +  }
     
    -  @transient private var arrayOfBlocks: Array[TorrentBlock] = null
    -  @transient private var totalBlocks = -1
    -  @transient private var totalBytes = -1
    -  @transient private var hasBlocks = 0
    +  /** Fetch torrent blocks from the driver and/or other executors. */
    +  private def readBlocks(): Array[Array[Byte]] = {
    +    // Fetch chunks of data. Note that all these chunks are stored in the 
BlockManager and reported
    +    // to the driver, so other executors can pull these thunks from this 
executor as well.
    --- End diff --
    
    Typo: thunks -> chunks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to