[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161157993 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- ohh, I see. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161157178 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- `can remove mappings...` That why I say entries can be removed automatically. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161156210 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- BTW - welcome back @jerryshao ! long time no see! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161156002 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- That should be fine even if that hard references not removed, since the memory consumption should be quite minor. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20183 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161154730 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- This is what I found from the doc: >Hash-based Map implementation that allows mappings to be removed by the garbage collector. When you construct a ReferenceMap, you can specify what kind of references are used to store the map's keys and values. If non-hard references are used, then the garbage collector can remove mappings if a key or value becomes unreachable, or if the JVM's memory is running low. For information on how the different reference types behave, see Reference. It only mentions that non-hard references can be removed by GC, please correct me if I'm wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161154173 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- according to the document of `ReferenceMap`, if key or value is eligible for GC, the entry will be removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161151338 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- If the key is a hard reference, does it mean that this key will never be cleaned from map automatically based on GC? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161149468 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- I see, thanks for pointing out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161148920 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- `ReferenceMap` is not thread safe, no - however, all operations on `broadcastCache` occur within the context of a synchronized block; TorrentBroadcast.scala lines 208-254. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161147892 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- Is this `ReferenceMap` thread safe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161135870 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ +setConf(SparkEnv.get.conf) --- End diff -- No, sorry - the cache update takes place within that block. With the exception of those blocks (lines 220-222 and lines 244-246), yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161134415 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- ah i see, because we never get the key reference outside of the map, makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161133496 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- This is the state of an executor at some point in time: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance1(broadcastId = IdInstance1, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) After some time Thread1 finishes process the partition it's working on and starts on the next - the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) At some point the GC destroys TorrentBroadcastInstance1. Now, if the key is a weak reference, the state becomes: Cache: Empty Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance3(broadcastId = IdInstance3, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) The next thread to finish processing a partition then creates a new instance of the broadcast value: Cache: IdInstance6 => ValueInstance2 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance2) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) On the other hand, if the key is a strong reference the the value is weak, the cached value isn't eligible for GC above. As such, when Thread3 finishes processing it's partition and starts the next, the state becomes: Cache: IdInstance1 => ValueInstance1 Thread1: TorrentBroadcastInstance5(broadcastId = IdInstance5, value = ValueInstance1) Thread2: TorrentBroadcastInstance2(broadcastId = IdInstance2, value = ValueInstance1) Thread3: TorrentBroadcastInstance6(broadcastId = IdInstance6, value = ValueInstance1) Thread4: TorrentBroadcastInstance4(broadcastId = IdInstance4, value = ValueInstance1) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132849 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- what's the difference between this and "weak key, hard value"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132399 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ --- End diff -- Fixed, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132203 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ +setConf(SparkEnv.get.conf) --- End diff -- Yes - everything within the getOrElse block is unchanged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user ho3rexqj commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161132057 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- Suppose the first thread to request the broadcast variable's value destroyed it's instance of the broadcast variable (which, I believe, is what will happen when that thread finishes processing it's partition) - if the key were a weak reference in the above cache it would become eligible for GC at that point. I'm reasonably certain at that point the associated key/value pair would be removed from the cache; in other words, if the key were a weak reference the key/value pair would be removed as soon as the key **or** value was garbage collected. Note that I haven't used ReferenceMap extensively, so I could be wrong about the above - feel free to correct me if that's the case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161127798 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ +setConf(SparkEnv.get.conf) --- End diff -- just to confirm, the following code is exactly same as before, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161127725 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,37 +206,51 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse({ --- End diff -- nit: code style ``` ...getOrElse { xxx } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161127616 --- Diff: core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala --- @@ -52,6 +54,10 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + private[broadcast] val cachedValues = { +new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) --- End diff -- why is the key not a weak reference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org