Github user pwendell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/126#discussion_r11374503
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala ---
    @@ -51,49 +51,80 @@ import org.apache.spark._
      * @tparam T Type of the data contained in the broadcast variable.
      */
     abstract class Broadcast[T](val id: Long) extends Serializable {
    -  def value: T
     
    -  // We cannot have an abstract readObject here due to some weird issues 
with
    -  // readObject having to be 'private' in sub-classes.
    +  /**
    +   * Flag signifying whether the broadcast variable is valid
    +   * (that is, not already destroyed) or not.
    +   */
    +  @volatile private var _isValid = true
     
    -  override def toString = "Broadcast(" + id + ")"
    -}
    -
    -private[spark]
    -class BroadcastManager(val _isDriver: Boolean, conf: SparkConf, 
securityManager: SecurityManager)
    -    extends Logging with Serializable {
    -
    -  private var initialized = false
    -  private var broadcastFactory: BroadcastFactory = null
    -
    -  initialize()
    -
    -  // Called by SparkContext or Executor before using Broadcast
    -  private def initialize() {
    -    synchronized {
    -      if (!initialized) {
    -        val broadcastFactoryClass = conf.get(
    -          "spark.broadcast.factory", 
"org.apache.spark.broadcast.HttpBroadcastFactory")
    -
    -        broadcastFactory =
    -          
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
    +  /** Get the broadcasted value. */
    +  def value: T = {
    +    assertValid()
    +    getValue()
    +  }
     
    -        // Initialize appropriate BroadcastFactory and BroadcastObject
    -        broadcastFactory.initialize(isDriver, conf, securityManager)
    +  /**
    +   * Asynchronously delete cached copies of this broadcast on the 
executors.
    +   * If the broadcast is used after this is called, it will need to be 
re-sent to each executor.
    +   */
    +  def unpersist() {
    +    unpersist(blocking = false)
    +  }
     
    -        initialized = true
    -      }
    -    }
    +  /**
    +   * Delete cached copies of this broadcast on the executors. If the 
broadcast is used after
    +   * this is called, it will need to be re-sent to each executor.
    +   * @param blocking Whether to block until unpersisting has completed
    +   */
    +  def unpersist(blocking: Boolean) {
    +    assertValid()
    +    doUnpersist(blocking)
       }
     
    -  def stop() {
    -    broadcastFactory.stop()
    +  /**
    +   * Destroy all data and metadata related to this broadcast variable. Use 
this with caution;
    +   * once a broadcast variable has been destroyed, it cannot be used again.
    +   */
    +  private[spark] def destroy(blocking: Boolean) {
    +    assertValid()
    +    _isValid = false
    +    doDestroy(blocking)
       }
     
    -  private val nextBroadcastId = new AtomicLong(0)
    +  /**
    +   * Whether this Broadcast is actually usable. This should be false once 
persisted state is
    +   * removed from the driver.
    +   */
    +  private[spark] def isValid: Boolean = {
    +    _isValid
    +  }
     
    -  def newBroadcast[T](value_ : T, isLocal: Boolean) =
    -    broadcastFactory.newBroadcast[T](value_, isLocal, 
nextBroadcastId.getAndIncrement())
    +  /**
    +   * Actually get the broadcasted value. Concrete implementations of 
Broadcast class must
    --- End diff --
    
    What if we removed the word "Actually" from this and the following two 
docs? It's not super clear to mean what it means to "Actually" do it...  
especially since the below two have a "blocking" parameter which already seems 
to dictate whether or not it does the thing right away.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to