vanzin commented on a change in pull request #23688: [SPARK-25035][Core]
Avoiding memory mapping at disk-stored blocks replication
URL: https://github.com/apache/spark/pull/23688#discussion_r255663144
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -221,6 +221,187 @@ private[spark] class BlockManager(
new BlockManager.RemoteBlockDownloadFileManager(this)
private val maxRemoteBlockToMem =
conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
+ /**
+ * @param blockSize the decrypted size of the block
+ */
+ private abstract class BlockStoreUpdater[T](
+ blockSize: Long,
+ blockId: BlockId,
+ level: StorageLevel,
+ classTag: ClassTag[T],
+ tellMaster: Boolean,
+ keepReadLock: Boolean) {
+
+ /**
+ * Reads the block content into the memory. If the update of the block
store is based on a
+ * temporary file this could lead to loading the whole file into a
ChunkedByteBuffer.
+ */
+ protected def readToByteBuffer(): ChunkedByteBuffer
+
+ protected def blockData(): BlockData
+
+ protected def saveToDiskStore(): Unit
+
+ private def saveDeserializedValuesToMemoryStore(inputStream: InputStream):
Boolean = {
+ val values = serializerManager.dataDeserializeStream(blockId,
inputStream)(classTag)
+ memoryStore.putIteratorAsValues(blockId, values, classTag) match {
+ case Right(_) => true
+ case Left(iter) =>
+ // If putting deserialized values in memory failed, we will put the
bytes directly
+ // to disk, so we don't need this iterator and can close it to free
resources
+ // earlier.
+ iter.close()
+ false
+ }
+ }
+
+ private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer):
Boolean = {
+ val memoryMode = level.memoryMode
+ memoryStore.putBytes(blockId, blockSize, memoryMode, () => {
+ if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(!_.isDirect)) {
+ bytes.copy(Platform.allocateDirectBuffer)
+ } else {
+ bytes
+ }
+ })
+ }
+
+ /**
+ * Put the given data according to the given level in one of the block
stores, replicating
+ * the values if necessary.
+ *
+ * If the block already exists, this method will not overwrite it.
+ *
+ * If keepReadLock is true, this method will hold the read lock when it
returns (even if the
+ * block already exists). If false, this method will hold no locks when it
returns.
+ *
+ * @return true if the block was already present or if the put succeeded,
false otherwise.
+ */
+ def save(): Boolean = {
+ doPut(blockId, level, classTag, tellMaster, keepReadLock) { info =>
+ val startTimeMs = System.currentTimeMillis
+
+ // Since we're storing bytes, initiate the replication before storing
them locally.
+ // This is faster as data is already serialized and ready to send.
+ val replicationFuture = if (level.replication > 1) {
+ Future {
+ // This is a blocking action and should run in
futureExecutionContext which is a cached
+ // thread pool.
+ replicate(blockId, blockData(), level, classTag)
+ }(futureExecutionContext)
+ } else {
+ null
+ }
+ if (level.useMemory) {
+ // Put it in memory first, even if it also has useDisk set to true;
+ // We will drop it to disk later if the memory store can't hold it.
+ val putSucceeded = if (level.deserialized) {
+ saveDeserializedValuesToMemoryStore(blockData().toInputStream())
+ } else {
+ saveSerializedValuesToMemoryStore(readToByteBuffer)
Review comment:
`readToByteBuffer()`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]