vanzin commented on a change in pull request #23688: [SPARK-25035][Core]
Avoiding memory mapping at disk-stored blocks replication
URL: https://github.com/apache/spark/pull/23688#discussion_r255660081
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -221,6 +221,187 @@ private[spark] class BlockManager(
new BlockManager.RemoteBlockDownloadFileManager(this)
private val maxRemoteBlockToMem =
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+ /**
+ * @param blockSize the decrypted size of the block
+ */
+ private abstract class BlockStoreUpdater[T](
+ blockSize: Long,
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ tellMaster: Boolean,
+ keepReadLock: Boolean) {
+
+ /**
+ * Reads the block content into the memory. If the update of the block
store is based on a
+ * temporary file this could lead to loading the whole file into a
ChunkedByteBuffer.
+ */
+ protected def readToByteBuffer(): ChunkedByteBuffer
+
+ protected def blockData(): BlockData
+
+ protected def saveToDiskStore(): Unit
+
+ private def saveDeserializedValuesToMemoryStore(inputStream: InputStream):
Boolean = {
+ val values = serializerManager.dataDeserializeStream(blockId,
inputStream)(classTag)
+ memoryStore.putIteratorAsValues(blockId, values, classTag) match {
+ case Right(_) => true
+ case Left(iter) =>
+ // If putting deserialized values in memory failed, we will put the
bytes directly
+ // to disk, so we don't need this iterator and can close it to free
resources
+ // earlier.
+ iter.close()
+ false
+ }
+ }
+
+ private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer):
Boolean = {
+ val memoryMode = level.memoryMode
+ memoryStore.putBytes(blockId, blockSize, memoryMode, () => {
+ if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(!_.isDirect)) {
+ bytes.copy(Platform.allocateDirectBuffer)
+ } else {
+ bytes
+ }
+ })
+ }
+
+ /**
+ * Put the given data according to the given level in one of the block
stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * If keepReadLock is true, this method will hold the read lock when it
returns (even if the
+ * block already exists). If false, this method will hold no locks when it
returns.
+ *
+ * @return true if the block was already present or if the put succeeded,
false otherwise.
+ */
+ def save(): Boolean = {
+ doPut(blockId, level, classTag, tellMaster, keepReadLock) { info =>
+ val startTimeMs = System.currentTimeMillis
+
+ // Since we're storing bytes, initiate the replication before storing
them locally.
+ // This is faster as data is already serialized and ready to send.
+ val replicationFuture = if (level.replication > 1) {
+ Future {
+ // This is a blocking action and should run in
futureExecutionContext which is a cached
+ // thread pool.
+ replicate(blockId, blockData(), level, classTag)
+ }(futureExecutionContext)
+ } else {
+ null
+ }
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ saveDeserializedValuesToMemoryStore(blockData().toInputStream())
+ } else {
+ saveSerializedValuesToMemoryStore(readToByteBuffer)
+ }
+ if (!putSucceeded && level.useDisk) {
+ logWarning(s"Persisting block $blockId to disk instead.")
+ saveToDiskStore()
+ }
+ } else if (level.useDisk) {
+ saveToDiskStore()
+ }
+ val putBlockStatus = getCurrentBlockStatus(blockId, info)
+ val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+ if (blockWasSuccessfullyStored) {
+ // Now that the block is in either the memory or disk store,
+ // tell the master about it.
+ info.size = blockSize
+ if (tellMaster && info.tellMaster) {
+ reportBlockStatus(blockId, putBlockStatus)
+ }
+ addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
+ }
+ logDebug("Put block %s locally took %s".format(blockId,
Utils.getUsedTimeMs(startTimeMs)))
+ if (level.replication > 1) {
+ // Wait for asynchronous replication to finish
+ try {
+ ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
+ } catch {
+ case NonFatal(t) =>
+ throw new Exception("Error occurred while waiting for
replication to finish", t)
+ }
+ }
+ if (blockWasSuccessfullyStored) {
+ None
+ } else {
+ Some(blockSize)
+ }
+ }.isEmpty
+ }
+ }
+
+ /**
+ * '''Important!''' Callers must not mutate or release the data buffer
underlying `bytes`. Doing
+ * so may corrupt or change the data stored by the `BlockManager`.
+ */
+ private case class ByteBufferBlockStoreUpdater[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ bytes: ChunkedByteBuffer,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false)
+ extends BlockStoreUpdater[T](bytes.size, blockId, level, classTag,
tellMaster, keepReadLock) {
+
+ override def readToByteBuffer(): ChunkedByteBuffer = bytes
+
+ /**
+ * The ByteBufferBlockData wrapper is not disposed of to avoid releasing
buffers that are
+ * owned by the caller.
+ */
+ override def blockData(): BlockData = new ByteBufferBlockData(bytes, false)
+
+ override def saveToDiskStore(): Unit = diskStore.putBytes(blockId, bytes)
+
+ }
+
+ private case class TempFileBasedBlockStoreUpdater[T](
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ tmpFile: File,
+ blockSize: Long,
+ tellMaster: Boolean = true,
+ keepReadLock: Boolean = false)
+ extends BlockStoreUpdater[T](blockSize, blockId, level, classTag,
tellMaster, keepReadLock) {
+
+ private var isTempFileMoved = false
+
+ /**
+ * Calling this method once leads to loading the content of the temporary
file into the memory.
+ */
+ override def readToByteBuffer(): ChunkedByteBuffer = {
+ val allocator = level.memoryMode match {
+ case MemoryMode.ON_HEAP => ByteBuffer.allocate _
+ case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
+ }
+ blockData().toChunkedByteBuffer(allocator)
+ }
+
+ override def blockData(): BlockData = diskStore.getBytes(tmpFile,
blockSize)
+
+ override def saveToDiskStore(): Unit = {
+ diskStore.moveFileToBlock(tmpFile, blockSize, blockId)
+ isTempFileMoved = true
+ }
+
+ override def save(): Boolean = {
+ val res = super.save()
+ if (!isTempFileMoved) {
Review comment:
I don't think you need this variable. `tmpFile.delete()` will not throw if
the file does not exist. Or you could just catch exceptions from the
`super.save()` call to detect whether you need to manually delete the file.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]