Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/11534#discussion_r55476210
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
---
@@ -789,117 +752,193 @@ private[spark] class BlockManager(
// lockNewBlockForWriting returned a read lock on the existing
block, so we must free it:
releaseLock(blockId)
}
- return DoPutSucceeded
+ return true
}
}
val startTimeMs = System.currentTimeMillis
- // Size of the block in bytes
- var size = 0L
-
- // The level we actually use to put the block
- val putLevel = effectiveStorageLevel.getOrElse(level)
-
- // If we're storing bytes, then initiate the replication before
storing them locally.
+ // Since we're storing bytes, initiate the replication before storing
them locally.
// This is faster as data is already serialized and ready to send.
- val replicationFuture = data match {
- case b: ByteBufferValues if putLevel.replication > 1 =>
- // Duplicate doesn't copy the bytes, but just creates a wrapper
- val bufferView = b.buffer.duplicate()
- Future {
- // This is a blocking action and should run in
futureExecutionContext which is a cached
- // thread pool
- replicate(blockId, bufferView, putLevel)
- }(futureExecutionContext)
- case _ => null
+ val replicationFuture = if (level.replication > 1) {
+ // Duplicate doesn't copy the bytes, but just creates a wrapper
+ val bufferView = bytes.duplicate()
+ Future {
+ // This is a blocking action and should run in
futureExecutionContext which is a cached
+ // thread pool
+ replicate(blockId, bufferView, level)
+ }(futureExecutionContext)
+ } else {
+ null
}
var blockWasSuccessfullyStored = false
- var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
- putBlockInfo.synchronized {
- logTrace("Put for block %s took %s to get into synchronized block"
- .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
- try {
- if (putLevel.useMemory) {
- // Put it in memory first, even if it also has useDisk set to
true;
- // We will drop it to disk later if the memory store can't hold
it.
- data match {
- case IteratorValues(iterator) =>
- memoryStore.putIterator(blockId, iterator(), putLevel) match
{
- case Right(s) =>
- size = s
- case Left(iter) =>
- iteratorFromFailedMemoryStorePut = Some(iter)
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- memoryStore.putBytes(blockId, bytes, putLevel)
- }
- } else if (putLevel.useDisk) {
- data match {
- case IteratorValues(iterator) =>
- diskStore.putIterator(blockId, iterator(), putLevel) match {
- case Right(s) =>
- size = s
- // putIterator() will never return Left (see its return
type).
- }
- case ByteBufferValues(bytes) =>
- bytes.rewind()
- size = bytes.limit()
- diskStore.putBytes(blockId, bytes, putLevel)
- }
+ bytes.rewind()
+ val size = bytes.limit()
+
+ try {
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ val values = dataDeserialize(blockId, bytes.duplicate())
+ memoryStore.putIterator(blockId, values, level).isRight
} else {
- assert(putLevel == StorageLevel.NONE)
- throw new BlockException(
- blockId, s"Attempted to put block $blockId without specifying
storage level!")
+ memoryStore.putBytes(blockId, size, () => bytes)
+ }
+ if (!putSucceeded && level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.putBytes(blockId, bytes)
}
+ } else if (level.useDisk) {
+ diskStore.putBytes(blockId, bytes)
+ }
- val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
- blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
- if (blockWasSuccessfullyStored) {
- // Now that the block is in either the memory,
externalBlockStore, or disk store,
- // let other threads read it, and tell the master about it.
- putBlockInfo.size = size
- if (tellMaster) {
- reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
- }
- Option(TaskContext.get()).foreach { c =>
- c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId,
putBlockStatus)))
- }
+ val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+ blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory, externalBlockStore,
or disk store,
+ // tell the master about it.
+ putBlockInfo.size = size
+ if (tellMaster) {
+ reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
}
- } finally {
- if (blockWasSuccessfullyStored) {
- if (keepReadLock) {
- blockInfoManager.downgradeLock(blockId)
- } else {
- blockInfoManager.unlock(blockId)
- }
+ Option(TaskContext.get()).foreach { c =>
+ c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId,
putBlockStatus)))
+ }
+ }
+ } finally {
+ if (blockWasSuccessfullyStored) {
+ if (keepReadLock) {
+ blockInfoManager.downgradeLock(blockId)
} else {
- blockInfoManager.removeBlock(blockId)
- logWarning(s"Putting block $blockId failed")
+ blockInfoManager.unlock(blockId)
}
+ } else {
+ blockInfoManager.removeBlock(blockId)
+ logWarning(s"Putting block $blockId failed")
}
}
logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
- if (replicationFuture != null) {
+ if (level.replication > 1) {
// Wait for asynchronous replication to finish
Await.ready(replicationFuture, Duration.Inf)
- } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
- val remoteStartTime = System.currentTimeMillis
- val bytesToReplicate: ByteBuffer = {
- doGetLocal(blockId, putBlockInfo, asBlockResult = false)
- .map(_.asInstanceOf[ByteBuffer])
- .getOrElse {
- throw new SparkException(s"Block $blockId was not found even
though it was just stored")
- }
+ logDebug("Putting block %s with replication took %s"
+ .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ } else {
+ logDebug("Putting block %s without replication took %s"
+ .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
+ }
+
+ blockWasSuccessfullyStored
+ }
+
+ /**
+ * Put the given block according to the given level in one of the block
stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * @param keepReadLock if true, this method will hold the read lock when
it returns (even if the
+ * block already exists). If false, this method will
hold no locks when it
+ * returns.
+ * @return None if the block was already present or if the put
succeeded, or Some(iterator)
+ * if the put failed.
+ */
+ private def doPutIterator(
+ blockId: BlockId,
+ iterator: () => Iterator[Any],
+ level: StorageLevel,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+
+ require(blockId != null, "BlockId is null")
+ require(level != null && level.isValid, "StorageLevel is null or
invalid")
+
+ /* Remember the block's storage level so that we can correctly drop it
to disk if it needs
+ * to be dropped right after it got put into memory. Note, however,
that other threads will
+ * not be able to get() this block until we call markReady on its
BlockInfo. */
+ val putBlockInfo = {
+ val newInfo = new BlockInfo(level, tellMaster)
+ if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+ newInfo
+ } else {
+ logWarning(s"Block $blockId already exists on this machine; not
re-adding it")
+ if (!keepReadLock) {
+ // lockNewBlockForWriting returned a read lock on the existing
block, so we must free it:
+ releaseLock(blockId)
+ }
+ return None
+ }
+ }
+
+ val startTimeMs = System.currentTimeMillis
+
+ // Size of the block in bytes
+ var size = 0L
+
+ var blockWasSuccessfullyStored = false
+ var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+
+ try {
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ memoryStore.putIterator(blockId, iterator(), level) match {
+ case Right(s) =>
+ size = s
+ case Left(iter) =>
+ // Not enough space to unroll this block; drop to disk if
applicable
+ if (level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ diskStore.put(blockId) { fileOutputStream =>
+ dataSerializeStream(blockId, fileOutputStream, iter)
--- End diff --
Nope, the entire block is now on disk because `iter` contains all of the
elements of the original input `iterator()`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]