Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/126#discussion_r11374503
--- Diff: core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala ---
@@ -51,49 +51,80 @@ import org.apache.spark._
* @tparam T Type of the data contained in the broadcast variable.
*/
abstract class Broadcast[T](val id: Long) extends Serializable {
- def value: T
- // We cannot have an abstract readObject here due to some weird issues
with
- // readObject having to be 'private' in sub-classes.
+ /**
+ * Flag signifying whether the broadcast variable is valid
+ * (that is, not already destroyed) or not.
+ */
+ @volatile private var _isValid = true
- override def toString = "Broadcast(" + id + ")"
-}
-
-private[spark]
-class BroadcastManager(val _isDriver: Boolean, conf: SparkConf,
securityManager: SecurityManager)
- extends Logging with Serializable {
-
- private var initialized = false
- private var broadcastFactory: BroadcastFactory = null
-
- initialize()
-
- // Called by SparkContext or Executor before using Broadcast
- private def initialize() {
- synchronized {
- if (!initialized) {
- val broadcastFactoryClass = conf.get(
- "spark.broadcast.factory",
"org.apache.spark.broadcast.HttpBroadcastFactory")
-
- broadcastFactory =
-
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
+ /** Get the broadcasted value. */
+ def value: T = {
+ assertValid()
+ getValue()
+ }
- // Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isDriver, conf, securityManager)
+ /**
+ * Asynchronously delete cached copies of this broadcast on the
executors.
+ * If the broadcast is used after this is called, it will need to be
re-sent to each executor.
+ */
+ def unpersist() {
+ unpersist(blocking = false)
+ }
- initialized = true
- }
- }
+ /**
+ * Delete cached copies of this broadcast on the executors. If the
broadcast is used after
+ * this is called, it will need to be re-sent to each executor.
+ * @param blocking Whether to block until unpersisting has completed
+ */
+ def unpersist(blocking: Boolean) {
+ assertValid()
+ doUnpersist(blocking)
}
- def stop() {
- broadcastFactory.stop()
+ /**
+ * Destroy all data and metadata related to this broadcast variable. Use
this with caution;
+ * once a broadcast variable has been destroyed, it cannot be used again.
+ */
+ private[spark] def destroy(blocking: Boolean) {
+ assertValid()
+ _isValid = false
+ doDestroy(blocking)
}
- private val nextBroadcastId = new AtomicLong(0)
+ /**
+ * Whether this Broadcast is actually usable. This should be false once
persisted state is
+ * removed from the driver.
+ */
+ private[spark] def isValid: Boolean = {
+ _isValid
+ }
- def newBroadcast[T](value_ : T, isLocal: Boolean) =
- broadcastFactory.newBroadcast[T](value_, isLocal,
nextBroadcastId.getAndIncrement())
+ /**
+ * Actually get the broadcasted value. Concrete implementations of
Broadcast class must
--- End diff --
What if we removed the word "Actually" from this and the following two
docs? It's not super clear to mean what it means to "Actually" do it...
especially since the below two have a "blocking" parameter which already seems
to dictate whether or not it does the thing right away.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---