spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values

2018-01-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 55695c712 -> 3ae3e1bb7


[SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of 
broadcast variable values

When resources happen to be constrained on an executor the first time a 
broadcast variable is instantiated it is persisted to disk by the BlockManager. 
Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock 
from other instances of that broadcast variable spawns another instance of the 
underlying value. That is, broadcast variables are spawned once per executor 
**unless** memory is constrained, in which case every instance of a broadcast 
variable is provided with a unique copy of the underlying value.

This patch fixes the above by explicitly caching the underlying values using 
weak references in a ReferenceMap.

Author: ho3rexqj 

Closes #20183 from ho3rexqj/fix/cache-broadcast-values.

(cherry picked from commit cbe7c6fbf9dc2fc422b93b3644c40d449a869eea)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae3e1bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae3e1bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae3e1bb

Branch: refs/heads/branch-2.3
Commit: 3ae3e1bb71aa88be1c963b4416986ef679d7c8a2
Parents: 55695c7
Author: ho3rexqj 
Authored: Fri Jan 12 15:27:00 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Jan 12 15:27:31 2018 +0800

--
 .../spark/broadcast/BroadcastManager.scala  |  6 ++
 .../spark/broadcast/TorrentBroadcast.scala  | 72 
 .../apache/spark/broadcast/BroadcastSuite.scala | 34 +
 3 files changed, 83 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala 
b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index e88988f..8d7a4a3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.reflect.ClassTag
 
+import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}
+
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
+  }
+
   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
 broadcastFactory.newBroadcast[T](value_, isLocal, 
nextBroadcastId.getAndIncrement())
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ae3e1bb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 7aecd3c..e125095 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored broadcast 
data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on this 
executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.put

spark git commit: [SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of broadcast variable values

2018-01-11 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master b5042d75c -> cbe7c6fbf


[SPARK-22986][CORE] Use a cache to avoid instantiating multiple instances of 
broadcast variable values

When resources happen to be constrained on an executor the first time a 
broadcast variable is instantiated it is persisted to disk by the BlockManager. 
Consequently, every subsequent call to TorrentBroadcast::readBroadcastBlock 
from other instances of that broadcast variable spawns another instance of the 
underlying value. That is, broadcast variables are spawned once per executor 
**unless** memory is constrained, in which case every instance of a broadcast 
variable is provided with a unique copy of the underlying value.

This patch fixes the above by explicitly caching the underlying values using 
weak references in a ReferenceMap.

Author: ho3rexqj 

Closes #20183 from ho3rexqj/fix/cache-broadcast-values.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbe7c6fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbe7c6fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbe7c6fb

Branch: refs/heads/master
Commit: cbe7c6fbf9dc2fc422b93b3644c40d449a869eea
Parents: b5042d7
Author: ho3rexqj 
Authored: Fri Jan 12 15:27:00 2018 +0800
Committer: Wenchen Fan 
Committed: Fri Jan 12 15:27:00 2018 +0800

--
 .../spark/broadcast/BroadcastManager.scala  |  6 ++
 .../spark/broadcast/TorrentBroadcast.scala  | 72 
 .../apache/spark/broadcast/BroadcastSuite.scala | 34 +
 3 files changed, 83 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala 
b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
index e88988f..8d7a4a3 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.reflect.ClassTag
 
+import org.apache.commons.collections.map.{AbstractReferenceMap, ReferenceMap}
+
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
 
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
+  }
+
   def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
 broadcastFactory.newBroadcast[T](value_, isLocal, 
nextBroadcastId.getAndIncrement())
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cbe7c6fb/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 7aecd3c..e125095 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: 
T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored broadcast 
data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on this 
executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"F