[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161157993
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

ohh, I see. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161157178
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

`can remove mappings...` That why I say entries can be removed 
automatically.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161156210
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

BTW - welcome back @jerryshao ! long time no see!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161156002
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

That should be fine even if that hard references not removed, since the 
memory consumption should be quite minor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20183


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161154730
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

This is what I found from the doc:

>Hash-based Map implementation that allows mappings to be removed by the 
garbage collector.
When you construct a ReferenceMap, you can specify what kind of references 
are used to store the map's keys and values. If non-hard references are used, 
then the garbage collector can remove mappings if a key or value becomes 
unreachable, or if the JVM's memory is running low. For information on how the 
different reference types behave, see Reference.

It only mentions that non-hard references can be removed by GC, please 
correct me if I'm wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161154173
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

according to the document of `ReferenceMap`, if key or value is eligible 
for GC, the entry will be removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161151338
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

If the key is a hard reference, does it mean that this key will never be 
cleaned from map automatically based on GC?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161149468
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

I see, thanks for pointing out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161148920
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

`ReferenceMap` is not thread safe, no - however, all operations on 
`broadcastCache` occur within the context of a synchronized block; 
TorrentBroadcast.scala lines 208-254.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161147892
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

Is this `ReferenceMap` thread safe?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161135870
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
+setConf(SparkEnv.get.conf)
--- End diff --

No, sorry - the cache update takes place within that block.  With the 
exception of those blocks (lines 220-222 and lines 244-246), yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161134415
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

ah i see, because we never get the key reference outside of the map, makes 
sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161133496
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

This is the state of an executor at some point in time:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

After some time Thread1 finishes process the partition it's working on and 
starts on the next - the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

At some point the GC destroys TorrentBroadcastInstance1.  Now, if the key 
is a weak reference, the state becomes:

Cache: Empty
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

The next thread to finish processing a partition then creates a new 
instance of the broadcast value:

Cache: IdInstance6 => ValueInstance2
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance2)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)

On the other hand, if the key is a strong reference the the value is weak, 
the cached value isn't eligible for GC above.  As such, when Thread3 finishes 
processing it's partition and starts the next, the state becomes:

Cache: IdInstance1 => ValueInstance1
Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = 
ValueInstance1)
Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = 
ValueInstance1)
Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = 
ValueInstance1)
Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = 
ValueInstance1)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132849
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

what's the difference between this and "weak key, hard value"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132399
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
--- End diff --

Fixed, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132203
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
+setConf(SparkEnv.get.conf)
--- End diff --

Yes - everything within the getOrElse block is unchanged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread ho3rexqj
Github user ho3rexqj commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161132057
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

Suppose the first thread to request the broadcast variable's value 
destroyed it's instance of the broadcast variable (which, I believe, is what 
will happen when that thread finishes processing it's partition) - if the key 
were a weak reference in the above cache it would become eligible for GC at 
that point.  I'm reasonably certain at that point the associated key/value pair 
would be removed from the cache; in other words, if the key were a weak 
reference the key/value pair would be removed as soon as the key **or** value 
was garbage collected.

Note that I haven't used ReferenceMap extensively, so I could be wrong 
about the above - feel free to correct me if that's the case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161127798
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
+setConf(SparkEnv.get.conf)
--- End diff --

just to confirm, the following code is exactly same as before, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161127725
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({
--- End diff --

nit: code style
```
...getOrElse {
  xxx
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161127616
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala ---
@@ -52,6 +54,10 @@ private[spark] class BroadcastManager(
 
   private val nextBroadcastId = new AtomicLong(0)
 
+  private[broadcast] val cachedValues = {
+new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
--- End diff --

why is the key not a weak reference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org