Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/11613#discussion_r55906584
--- Diff:
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -100,48 +92,136 @@ private[spark] class MemoryStore(
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer):
Boolean = {
require(!contains(blockId), s"Block $blockId is already present in the
MemoryStore")
- // Work on a duplicate - since the original input might be used
elsewhere.
- lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
- val putSuccess = tryToPut(blockId, () => bytes, size, deserialized =
false)
- if (putSuccess) {
+ if (memoryManager.acquireStorageMemory(blockId, size)) {
+ // We acquired enough memory for the block, so go ahead and put it
+ // Work on a duplicate - since the original input might be used
elsewhere.
+ val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
assert(bytes.limit == size)
+ val entry = new MemoryEntry(bytes, size, deserialized = false)
+ entries.synchronized {
+ entries.put(blockId, entry)
+ }
+ logInfo("Block %s stored as bytes in memory (estimated size %s, free
%s)".format(
+ blockId, Utils.bytesToString(size),
Utils.bytesToString(blocksMemoryUsed)))
+ true
+ } else {
+ false
}
- putSuccess
}
/**
* Attempt to put the given block in memory store.
*
- * @return the estimated size of the stored data if the put() succeeded,
or an iterator
- * in case the put() failed (the returned iterator lets callers
fall back to the disk
- * store if desired).
+ * @return in case of success, the estimated the estimated size of the
stored data. In case of
+ * failure, return an iterator contianing the values of the
block. The returned iterator
--- End diff --
Fixed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]