Github user andrewor14 commented on a diff in the pull request:
https://github.com/apache/spark/pull/11534#discussion_r55618498
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -789,117 +742,193 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing
block, so we must free it:
releaseLock(blockId)
}
- return DoPutSucceeded
+ return true
}
}
val startTimeMs = System.currentTimeMillis
- // Size of the block in bytes
- var size = 0L
-
- // The level we actually use to put the block
- val putLevel = effectiveStorageLevel.getOrElse(level)
-
- // If we're storing bytes, then initiate the replication before
storing them locally.
+ // Since we're storing bytes, initiate the replication before storing
them locally.
// This is faster as data is already serialized and ready to send.
- val replicationFuture = data match {
- case b: ByteBufferValues if putLevel.replication > 1 =>
- // Duplicate doesn't copy the bytes, but just creates a wrapper
- val bufferView = b.buffer.duplicate()
- Future {
- // This is a blocking action and should run in
futureExecutionContext which is a cached
- // thread pool
- replicate(blockId, bufferView, putLevel)
- }(futureExecutionContext)
- case _ => null
+ val replicationFuture = if (level.replication > 1) {
+ // Duplicate doesn't copy the bytes, but just creates a wrapper
+ val bufferView = bytes.duplicate()
+ Future {
+ // This is a blocking action and should run in
futureExecutionContext which is a cached
+ // thread pool
+ replicate(blockId, bufferView, level)
+ }(futureExecutionContext)
+ } else {
+ null
}
var blockWasSuccessfullyStored = false
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
- putBlockInfo.synchronized {
- logTrace("Put for block %s took %s to get into synchronized block"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
- try {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to
true;
- // We will drop it to disk later if the memory store can't hold
it.
- data match {
- case IteratorValues(iterator) =>
- memoryStore.putIterator(blockId, iterator(), putLevel) match
{
- case Right(s) =>
- size = s
- case Left(iter) =>
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- memoryStore.putBytes(blockId, bytes, putLevel)
- }
- } else if (putLevel.useDisk) {
- data match {
- case IteratorValues(iterator) =>
- diskStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- // putIterator() will never return Left (see its return
type).
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- diskStore.putBytes(blockId, bytes, putLevel)
- }
+ bytes.rewind()
+ val size = bytes.limit()
+
+ try {
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ val values = dataDeserialize(blockId, bytes.duplicate())
+ memoryStore.putIterator(blockId, values, level).isRight
--- End diff --
This is in `doPutBytes`. Similar to another comment I had above, there's no
reason why we shouldn't attempt to drop it to disk here if it's left. Right now
if `memoryStore.putIterator` doesn't fit in memory then we sometimes drop it to
disk but sometimes not. That's confusing.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]