[SPARK-13696] Remove BlockStore class & simplify interfaces of mem. & disk 
stores

Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, 
but I feel that this API is inappropriate because it abstracts away important 
distinctions between the behavior of these two stores.

For instance, the disk store doesn't have a notion of storing deserialized 
objects, so it's confusing for it to expose object-based APIs like 
putIterator() and getValues() instead of only exposing binary APIs and pushing 
the responsibilities of serialization and deserialization to the client. 
Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even 
though the disk store can only store blocks in one form.

As part of a larger BlockManager interface cleanup, this patch remove the 
BlockStore interface and refines the MemoryStore and DiskStore interfaces to 
reflect more narrow sets of responsibilities for those components. Some of the 
benefits of this interface cleanup are reflected in simplifications to several 
unit tests to eliminate now-unnecessary mocking, significant simplification of 
the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API 
between the MemoryStore and DiskStore.

Author: Josh Rosen <joshro...@databricks.com>

Closes #11534 from JoshRosen/remove-blockstore-interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81d48532
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81d48532
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81d48532

Branch: refs/heads/master
Commit: 81d48532d954a8aea28d7e1fb3aa32a78c708b63
Parents: 3d2b6f5
Author: Josh Rosen <joshro...@databricks.com>
Authored: Thu Mar 10 15:08:41 2016 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Mar 10 15:08:41 2016 -0800

----------------------------------------------------------------------
 .../spark/broadcast/TorrentBroadcast.scala      |   2 +-
 .../org/apache/spark/memory/MemoryManager.scala |   4 +-
 .../apache/spark/memory/StorageMemoryPool.scala |   4 +-
 .../org/apache/spark/storage/BlockManager.scala | 612 ++++++++++---------
 .../org/apache/spark/storage/BlockStore.scala   |  63 --
 .../apache/spark/storage/DiskBlockManager.scala |  15 +-
 .../org/apache/spark/storage/DiskStore.scala    | 114 ++--
 .../org/apache/spark/storage/MemoryStore.scala  | 566 -----------------
 .../spark/storage/memory/MemoryStore.scala      | 522 ++++++++++++++++
 .../apache/spark/broadcast/BroadcastSuite.scala |   2 +-
 .../spark/memory/MemoryManagerSuite.scala       |   3 +-
 .../spark/memory/StaticMemoryManagerSuite.scala |   3 +-
 .../memory/UnifiedMemoryManagerSuite.scala      |   3 +-
 .../storage/BlockManagerReplicationSuite.scala  |   3 +-
 .../spark/storage/BlockManagerSuite.scala       |  74 +--
 .../spark/storage/DiskBlockManagerSuite.scala   |   5 +-
 .../apache/spark/storage/DiskStoreSuite.scala   |  62 ++
 .../streaming/ReceivedBlockHandlerSuite.scala   |  10 +-
 18 files changed, 995 insertions(+), 1072 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index dabc810..550e1ba 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
     TorrentBroadcast.synchronized {
       setConf(SparkEnv.get.conf)
       val blockManager = SparkEnv.get.blockManager
-      blockManager.getLocal(broadcastId).map(_.data.next()) match {
+      blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
         case Some(x) =>
           releaseLock(broadcastId)
           x.asInstanceOf[T]

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b5adbd8..e89b03e 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.memory.MemoryAllocator
 
@@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(
 
   /**
    * Release all memory for the given task and mark it as inactive (e.g. when 
a task ends).
+   *
    * @return the number of bytes freed.
    */
   private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): 
Long = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala 
b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 6a88966..1d376ad 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -20,7 +20,8 @@ package org.apache.spark.memory
 import javax.annotation.concurrent.GuardedBy
 
 import org.apache.spark.Logging
-import org.apache.spark.storage.{BlockId, MemoryStore}
+import org.apache.spark.storage.BlockId
+import org.apache.spark.storage.memory.MemoryStore
 
 /**
  * Performs bookkeeping for managing an adjustable-size pool of memory that is 
used for storage
@@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends 
MemoryPool(lock) w
 
   /**
    * Acquire N bytes of memory to cache the given block, evicting existing 
ones if necessary.
+ *
    * @return whether all N bytes were successfully granted.
    */
   def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = 
lock.synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b38e2ec..873330e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -40,24 +40,15 @@ import 
org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.serializer.{Serializer, SerializerInstance}
 import org.apache.spark.shuffle.ShuffleManager
+import org.apache.spark.storage.memory._
 import org.apache.spark.util._
 
-private[spark] sealed trait BlockValues
-private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends 
BlockValues
-private[spark] case class IteratorValues(iterator: () => Iterator[Any]) 
extends BlockValues
-
 /* Class for returning a fetched block and associated metrics. */
 private[spark] class BlockResult(
     val data: Iterator[Any],
     val readMethod: DataReadMethod.Value,
     val bytes: Long)
 
-// Class for representing return value of doPut()
-private sealed trait DoPutResult
-private case object DoPutSucceeded extends DoPutResult
-private case object DoPutBytesFailed extends DoPutResult
-private case class DoPutIteratorFailed(iter: Iterator[Any]) extends DoPutResult
-
 /**
  * Manager running on every node (driver and executors) which provides 
interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, 
disk, and off-heap).
@@ -78,7 +69,15 @@ private[spark] class BlockManager(
     numUsableCores: Int)
   extends BlockDataManager with Logging {
 
-  val diskBlockManager = new DiskBlockManager(this, conf)
+  private[spark] val externalShuffleServiceEnabled =
+    conf.getBoolean("spark.shuffle.service.enabled", false)
+
+  val diskBlockManager = {
+    // Only perform cleanup if an external service is not serving our shuffle 
files.
+    val deleteFilesOnStop =
+      !externalShuffleServiceEnabled || executorId == 
SparkContext.DRIVER_IDENTIFIER
+    new DiskBlockManager(conf, deleteFilesOnStop)
+  }
 
   private[storage] val blockInfoManager = new BlockInfoManager
 
@@ -86,8 +85,8 @@ private[spark] class BlockManager(
     ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
 
   // Actual storage of where blocks are kept
-  private[spark] val memoryStore = new MemoryStore(this, memoryManager)
-  private[spark] val diskStore = new DiskStore(this, diskBlockManager)
+  private[spark] val memoryStore = new MemoryStore(conf, this, memoryManager)
+  private[spark] val diskStore = new DiskStore(conf, diskBlockManager)
   memoryManager.setMemoryStore(memoryStore)
 
   // Note: depending on the memory manager, `maxStorageMemory` may actually 
vary over time.
@@ -96,9 +95,6 @@ private[spark] class BlockManager(
   // to revisit whether reporting this value as the "max" is intuitive to the 
user.
   private val maxMemory = memoryManager.maxStorageMemory
 
-  private[spark]
-  val externalShuffleServiceEnabled = 
conf.getBoolean("spark.shuffle.service.enabled", false)
-
   // Port used by the external shuffle service. In Yarn mode, this may be 
already be
   // set through the Hadoop configuration as the server is launched in the 
Yarn NM.
   private val externalShuffleServicePort = {
@@ -285,13 +281,9 @@ private[spark] class BlockManager(
     if (blockId.isShuffle) {
       
shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
     } else {
-      val blockBytesOpt = doGetLocal(blockId, asBlockResult = false)
-        .asInstanceOf[Option[ByteBuffer]]
-      if (blockBytesOpt.isDefined) {
-        val buffer = blockBytesOpt.get
-        new BlockManagerManagedBuffer(this, blockId, buffer)
-      } else {
-        throw new BlockNotFoundException(blockId.toString)
+      getLocalBytes(blockId) match {
+        case Some(buffer) => new BlockManagerManagedBuffer(this, blockId, 
buffer)
+        case None => throw new BlockNotFoundException(blockId.toString)
       }
     }
   }
@@ -407,11 +399,71 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Get block from local block manager.
+   * Get block from local block manager as an iterator of Java objects.
    */
-  def getLocal(blockId: BlockId): Option[BlockResult] = {
+  def getLocalValues(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting local block $blockId")
-    doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
+    blockInfoManager.lockForReading(blockId) match {
+      case None =>
+        logDebug(s"Block $blockId was not found")
+        None
+      case Some(info) =>
+        val level = info.level
+        logDebug(s"Level for block $blockId is $level")
+        if (level.useMemory && memoryStore.contains(blockId)) {
+          val iter: Iterator[Any] = if (level.deserialized) {
+            memoryStore.getValues(blockId).get
+          } else {
+            dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
+        } else if (level.useDisk && diskStore.contains(blockId)) {
+          val iterToReturn: Iterator[Any] = {
+            val diskBytes = diskStore.getBytes(blockId)
+            if (level.deserialized) {
+              val diskIterator = dataDeserialize(blockId, diskBytes)
+              if (level.useMemory) {
+                // Cache the values before returning them
+                memoryStore.putIterator(blockId, diskIterator, level) match {
+                  case Left(iter) =>
+                    // The memory store put() failed, so it returned the 
iterator back to us:
+                    iter
+                  case Right(_) =>
+                    // The put() succeeded, so we can read the values back:
+                    memoryStore.getValues(blockId).get
+                }
+              } else {
+                diskIterator
+              }
+            } else { // storage level is serialized
+              if (level.useMemory) {
+                // Cache the bytes back into memory to speed up subsequent 
reads.
+                val putSucceeded = memoryStore.putBytes(blockId, 
diskBytes.limit(), () => {
+                  // https://issues.apache.org/jira/browse/SPARK-6076
+                  // If the file size is bigger than the free memory, OOM will 
happen. So if we
+                  // cannot put it into MemoryStore, copyForMemory should not 
be created. That's why
+                  // this action is put into a `() => ByteBuffer` and created 
lazily.
+                  val copyForMemory = ByteBuffer.allocate(diskBytes.limit)
+                  copyForMemory.put(diskBytes)
+                })
+                if (putSucceeded) {
+                  dataDeserialize(blockId, memoryStore.getBytes(blockId).get)
+                } else {
+                  dataDeserialize(blockId, diskBytes)
+                }
+              } else {
+                dataDeserialize(blockId, diskBytes)
+              }
+            }
+          }
+          val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, 
releaseLock(blockId))
+          Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          releaseLock(blockId)
+          throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
+        }
+    }
   }
 
   /**
@@ -428,77 +480,44 @@ private[spark] class BlockManager(
       Option(
         
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
     } else {
-      doGetLocal(blockId, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
-    }
-  }
-
-  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): 
Option[Any] = {
-    blockInfoManager.lockForReading(blockId) match {
-      case None =>
-        logDebug(s"Block $blockId was not found")
-        None
-      case Some(info) =>
-        doGetLocal(blockId, info, asBlockResult)
+      blockInfoManager.lockForReading(blockId).map { info => 
doGetLocalBytes(blockId, info) }
     }
   }
 
   /**
-   * Get a local block from the block manager.
-   * Assumes that the caller holds a read lock on the block.
+   * Get block from the local block manager as serialized bytes.
+   *
+   * Must be called while holding a read lock on the block.
+   * Releases the read lock upon exception; keeps the read lock upon 
successful return.
    */
-  private def doGetLocal(
-      blockId: BlockId,
-      info: BlockInfo,
-      asBlockResult: Boolean): Option[Any] = {
+  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = 
{
     val level = info.level
     logDebug(s"Level for block $blockId is $level")
-
-    // Look for the block in memory
-    if (level.useMemory) {
-      logDebug(s"Getting block $blockId from memory")
-      val result = if (asBlockResult) {
-        memoryStore.getValues(blockId).map { iter =>
-          val ci = CompletionIterator[Any, Iterator[Any]](iter, 
releaseLock(blockId))
-          new BlockResult(ci, DataReadMethod.Memory, info.size)
-        }
+    // In order, try to read the serialized bytes from memory, then from disk, 
then fall back to
+    // serializing in-memory objects, and, finally, throw an exception if the 
block does not exist.
+    if (level.deserialized) {
+      // Try to avoid expensive serialization by reading a pre-serialized copy 
from disk:
+      if (level.useDisk && diskStore.contains(blockId)) {
+        // Note: we purposely do not try to put the block back into memory 
here. Since this branch
+        // handles deserialized blocks, this block may only be cached in 
memory as objects, not
+        // serialized bytes. Because the caller only requested bytes, it 
doesn't make sense to
+        // cache the block's deserialized objects since that caching may not 
have a payoff.
+        diskStore.getBytes(blockId)
+      } else if (level.useMemory && memoryStore.contains(blockId)) {
+        // The block was not found on disk, so serialize an in-memory copy:
+        dataSerialize(blockId, memoryStore.getValues(blockId).get)
       } else {
-        memoryStore.getBytes(blockId)
-      }
-      result match {
-        case Some(values) =>
-          return result
-        case None =>
-          logDebug(s"Block $blockId not found in memory")
-      }
-    }
-
-    // Look for block on disk, potentially storing it back in memory if 
required
-    if (level.useDisk) {
-      logDebug(s"Getting block $blockId from disk")
-      val bytes: ByteBuffer = diskStore.getBytes(blockId) match {
-        case Some(b) => b
-        case None =>
-          releaseLock(blockId)
-          throw new BlockException(
-            blockId, s"Block $blockId not found on disk, though it should be")
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
       }
-      assert(0 == bytes.position())
-
-      if (!level.useMemory) {
-        // If the block shouldn't be stored in memory, we can just return it
-        if (asBlockResult) {
-          val iter = CompletionIterator[Any, Iterator[Any]](
-            dataDeserialize(blockId, bytes), releaseLock(blockId))
-          return Some(new BlockResult(iter, DataReadMethod.Disk, info.size))
-        } else {
-          return Some(bytes)
-        }
-      } else {
-        // Otherwise, we also have to store something in the memory store
-        if (!level.deserialized && !asBlockResult) {
-          /* We'll store the bytes in memory if the block's storage level 
includes
-           * "memory serialized" and we requested its serialized bytes. */
-          memoryStore.putBytes(blockId, bytes.limit, () => {
+    } else {  // storage level is serialized
+      if (level.useMemory && memoryStore.contains(blockId)) {
+        memoryStore.getBytes(blockId).get
+      } else if (level.useDisk && diskStore.contains(blockId)) {
+        val bytes = diskStore.getBytes(blockId)
+        if (level.useMemory) {
+          // Cache the bytes back into memory to speed up subsequent reads.
+          val memoryStorePutSucceeded = memoryStore.putBytes(blockId, 
bytes.limit(), () => {
             // https://issues.apache.org/jira/browse/SPARK-6076
             // If the file size is bigger than the free memory, OOM will 
happen. So if we cannot
             // put it into MemoryStore, copyForMemory should not be created. 
That's why this
@@ -506,39 +525,19 @@ private[spark] class BlockManager(
             val copyForMemory = ByteBuffer.allocate(bytes.limit)
             copyForMemory.put(bytes)
           })
-          bytes.rewind()
-        }
-        if (!asBlockResult) {
-          return Some(bytes)
-        } else {
-          val values = dataDeserialize(blockId, bytes)
-          val valuesToReturn: Iterator[Any] = {
-            if (level.deserialized) {
-              // Cache the values before returning them
-              memoryStore.putIterator(blockId, values, level, 
allowPersistToDisk = false) match {
-                case Left(iter) =>
-                  // The memory store put() failed, so it returned the 
iterator back to us:
-                  iter
-                case Right(_) =>
-                  // The put() succeeded, so we can read the values back:
-                  memoryStore.getValues(blockId).get
-              }
-            } else {
-              values
-            }
+          if (memoryStorePutSucceeded) {
+            memoryStore.getBytes(blockId).get
+          } else {
+            bytes.rewind()
+            bytes
           }
-          val ci = CompletionIterator[Any, Iterator[Any]](valuesToReturn, 
releaseLock(blockId))
-          return Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
+        } else {
+          bytes
         }
+      } else {
+        releaseLock(blockId)
+        throw new SparkException(s"Block $blockId was not found even though 
it's read-locked")
       }
-    } else {
-      // This branch represents a case where the BlockInfoManager contained an 
entry for
-      // the block but the block could not be found in any of the block 
stores. This case
-      // should never occur, but for completeness's sake we address it here.
-      logError(
-        s"Block $blockId is supposedly stored locally but was not found in any 
block store")
-      releaseLock(blockId)
-      None
     }
   }
 
@@ -547,17 +546,10 @@ private[spark] class BlockManager(
    *
    * This does not acquire a lock on this block in this JVM.
    */
-  def getRemote(blockId: BlockId): Option[BlockResult] = {
-    logDebug(s"Getting remote block $blockId")
-    doGetRemote(blockId, asBlockResult = 
true).asInstanceOf[Option[BlockResult]]
-  }
-
-  /**
-   * Get block from remote block managers as serialized bytes.
-   */
-  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
-    logDebug(s"Getting remote block $blockId as bytes")
-    doGetRemote(blockId, asBlockResult = 
false).asInstanceOf[Option[ByteBuffer]]
+  def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
+    getRemoteBytes(blockId).map { data =>
+      new BlockResult(dataDeserialize(blockId, data), DataReadMethod.Network, 
data.limit())
+    }
   }
 
   /**
@@ -570,7 +562,11 @@ private[spark] class BlockManager(
     preferredLocs ++ otherLocs
   }
 
-  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): 
Option[Any] = {
+  /**
+   * Get block from remote block managers as serialized bytes.
+   */
+  def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
+    logDebug(s"Getting remote block $blockId")
     require(blockId != null, "BlockId is null")
     val locations = getLocations(blockId)
     var numFetchFailures = 0
@@ -595,14 +591,7 @@ private[spark] class BlockManager(
       }
 
       if (data != null) {
-        if (asBlockResult) {
-          return Some(new BlockResult(
-            dataDeserialize(blockId, data),
-            DataReadMethod.Network,
-            data.limit()))
-        } else {
-          return Some(data)
-        }
+        return Some(data)
       }
       logDebug(s"The value of block $blockId is null")
     }
@@ -618,12 +607,12 @@ private[spark] class BlockManager(
    * automatically be freed once the result's `data` iterator is fully 
consumed.
    */
   def get(blockId: BlockId): Option[BlockResult] = {
-    val local = getLocal(blockId)
+    val local = getLocalValues(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
       return local
     }
-    val remote = getRemote(blockId)
+    val remote = getRemoteValues(blockId)
     if (remote.isDefined) {
       logInfo(s"Found block $blockId remotely")
       return remote
@@ -673,24 +662,26 @@ private[spark] class BlockManager(
       level: StorageLevel,
       makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = 
{
     // Initially we hold no locks on this block.
-    doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) 
match {
-      case DoPutSucceeded =>
+    doPutIterator(blockId, makeIterator, level, keepReadLock = true) match {
+      case None =>
         // doPut() didn't hand work back to us, so the block already existed 
or was successfully
         // stored. Therefore, we now hold a read lock on the block.
-        val blockResult = get(blockId).getOrElse {
+        val blockResult = getLocalValues(blockId).getOrElse {
           // Since we held a read lock between the doPut() and get() calls, 
the block should not
           // have been evicted, so get() not returning the block indicates 
some internal error.
           releaseLock(blockId)
           throw new SparkException(s"get() failed for block $blockId even 
though we held a lock")
         }
+        // We already hold a read lock on the block from the doPut() call and 
getLocalValues()
+        // acquires the lock again, so we need to call releaseLock() here so 
that the net number
+        // of lock acquisitions is 1 (since the caller will only call 
release() once).
+        releaseLock(blockId)
         Left(blockResult)
-      case DoPutIteratorFailed(iter) =>
+      case Some(iter) =>
         // The put failed, likely because the data was too large to fit in 
memory and could not be
         // dropped to disk. Therefore, we need to pass the input iterator back 
to the caller so
         // that they can decide what to do with the values (e.g. process them 
without caching).
        Right(iter)
-      case DoPutBytesFailed =>
-        throw new SparkException("doPut returned an invalid failure response")
     }
   }
 
@@ -701,16 +692,10 @@ private[spark] class BlockManager(
       blockId: BlockId,
       values: Iterator[Any],
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(values != null, "Values is null")
-    val result = doPut(
-      blockId,
-      IteratorValues(() => values),
-      level,
-      tellMaster,
-      effectiveStorageLevel)
-    result == DoPutSucceeded
+    // If doPut() didn't hand work back to us, then block already existed or 
was successfully stored
+    doPutIterator(blockId, () => values, level, tellMaster).isEmpty
   }
 
   /**
@@ -739,46 +724,105 @@ private[spark] class BlockManager(
       blockId: BlockId,
       bytes: ByteBuffer,
       level: StorageLevel,
-      tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
+      tellMaster: Boolean = true): Boolean = {
     require(bytes != null, "Bytes is null")
-    val result = doPut(blockId, ByteBufferValues(bytes), level, tellMaster, 
effectiveStorageLevel)
-    result == DoPutSucceeded
+    doPutBytes(blockId, bytes, level, tellMaster)
   }
 
   /**
-   * Put the given block according to the given level in one of the block 
stores, replicating
+   * Put the given bytes according to the given level in one of the block 
stores, replicating
    * the values if necessary.
    *
    * If the block already exists, this method will not overwrite it.
    *
-   * @param effectiveStorageLevel the level according to which the block will 
actually be handled.
-   *                              This allows the caller to specify an 
alternate behavior of doPut
-   *                              while preserving the original level 
specified by the user.
    * @param keepReadLock if true, this method will hold the read lock when it 
returns (even if the
    *                     block already exists). If false, this method will 
hold no locks when it
    *                     returns.
-   * @return [[DoPutSucceeded]] if the block was already present or if the put 
succeeded, or
-   *        [[DoPutBytesFailed]] if the put failed and we were storing bytes, 
or
-   *        [[DoPutIteratorFailed]] if the put failed and we were storing an 
iterator.
+   * @return true if the block was already present or if the put succeeded, 
false otherwise.
    */
-  private def doPut(
+  private def doPutBytes(
       blockId: BlockId,
-      data: BlockValues,
+      bytes: ByteBuffer,
       level: StorageLevel,
       tellMaster: Boolean = true,
-      effectiveStorageLevel: Option[StorageLevel] = None,
-      keepReadLock: Boolean = false): DoPutResult = {
+      keepReadLock: Boolean = false): Boolean = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      // Since we're storing bytes, initiate the replication before storing 
them locally.
+      // This is faster as data is already serialized and ready to send.
+      val replicationFuture = if (level.replication > 1) {
+        // Duplicate doesn't copy the bytes, but just creates a wrapper
+        val bufferView = bytes.duplicate()
+        Future {
+          // This is a blocking action and should run in 
futureExecutionContext which is a cached
+          // thread pool
+          replicate(blockId, bufferView, level)
+        }(futureExecutionContext)
+      } else {
+        null
+      }
+
+      bytes.rewind()
+      val size = bytes.limit()
+
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        val putSucceeded = if (level.deserialized) {
+          val values = dataDeserialize(blockId, bytes.duplicate())
+          memoryStore.putIterator(blockId, values, level).isRight
+        } else {
+          memoryStore.putBytes(blockId, size, () => bytes)
+        }
+        if (!putSucceeded && level.useDisk) {
+          logWarning(s"Persisting block $blockId to disk instead.")
+          diskStore.putBytes(blockId, bytes)
+        }
+      } else if (level.useDisk) {
+        diskStore.putBytes(blockId, bytes)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or 
disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
+        }
+      }
+      logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
+      if (level.replication > 1) {
+        // Wait for asynchronous replication to finish
+        Await.ready(replicationFuture, Duration.Inf)
+      }
+      if (blockWasSuccessfullyStored) {
+        None
+      } else {
+        Some(bytes)
+      }
+    }.isEmpty
+  }
+
+  /**
+   * Helper method used to abstract common code from [[doPutBytes()]] and 
[[doPutIterator()]].
+   *
+   * @param putBody a function which attempts the actual put() and returns 
None on success
+   *                or Some on failure.
+   */
+  private def doPut[T](
+      blockId: BlockId,
+      level: StorageLevel,
+      tellMaster: Boolean,
+      keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T] = {
 
     require(blockId != null, "BlockId is null")
     require(level != null && level.isValid, "StorageLevel is null or invalid")
-    effectiveStorageLevel.foreach { level =>
-      require(level != null && level.isValid, "Effective StorageLevel is null 
or invalid")
-    }
 
-    /* Remember the block's storage level so that we can correctly drop it to 
disk if it needs
-     * to be dropped right after it got put into memory. Note, however, that 
other threads will
-     * not be able to get() this block until we call markReady on its 
BlockInfo. */
     val putBlockInfo = {
       val newInfo = new BlockInfo(level, tellMaster)
       if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
@@ -789,138 +833,113 @@ private[spark] class BlockManager(
           // lockNewBlockForWriting returned a read lock on the existing 
block, so we must free it:
           releaseLock(blockId)
         }
-        return DoPutSucceeded
+        return None
       }
     }
 
     val startTimeMs = System.currentTimeMillis
-
-    // Size of the block in bytes
-    var size = 0L
-
-    // The level we actually use to put the block
-    val putLevel = effectiveStorageLevel.getOrElse(level)
-
-    // If we're storing bytes, then initiate the replication before storing 
them locally.
-    // This is faster as data is already serialized and ready to send.
-    val replicationFuture = data match {
-      case b: ByteBufferValues if putLevel.replication > 1 =>
-        // Duplicate doesn't copy the bytes, but just creates a wrapper
-        val bufferView = b.buffer.duplicate()
-        Future {
-          // This is a blocking action and should run in 
futureExecutionContext which is a cached
-          // thread pool
-          replicate(blockId, bufferView, putLevel)
-        }(futureExecutionContext)
-      case _ => null
-    }
-
-    var blockWasSuccessfullyStored = false
-    var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
-
-    putBlockInfo.synchronized {
-      logTrace("Put for block %s took %s to get into synchronized block"
-        .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
-
-      try {
-        if (putLevel.useMemory) {
-          // Put it in memory first, even if it also has useDisk set to true;
-          // We will drop it to disk later if the memory store can't hold it.
-          data match {
-            case IteratorValues(iterator) =>
-              memoryStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                case Left(iter) =>
-                  iteratorFromFailedMemoryStorePut = Some(iter)
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              memoryStore.putBytes(blockId, bytes, putLevel)
-          }
-        } else if (putLevel.useDisk) {
-          data match {
-            case IteratorValues(iterator) =>
-              diskStore.putIterator(blockId, iterator(), putLevel) match {
-                case Right(s) =>
-                  size = s
-                // putIterator() will never return Left (see its return type).
-              }
-            case ByteBufferValues(bytes) =>
-              bytes.rewind()
-              size = bytes.limit()
-              diskStore.putBytes(blockId, bytes, putLevel)
-          }
+    var blockWasSuccessfullyStored: Boolean = false
+    val result: Option[T] = try {
+      val res = putBody(putBlockInfo)
+      blockWasSuccessfullyStored = res.isEmpty
+      res
+    } finally {
+      if (blockWasSuccessfullyStored) {
+        if (keepReadLock) {
+          blockInfoManager.downgradeLock(blockId)
         } else {
-          assert(putLevel == StorageLevel.NONE)
-          throw new BlockException(
-            blockId, s"Attempted to put block $blockId without specifying 
storage level!")
+          blockInfoManager.unlock(blockId)
         }
-
-        val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
-        blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
-        if (blockWasSuccessfullyStored) {
-          // Now that the block is in either the memory, externalBlockStore, 
or disk store,
-          // let other threads read it, and tell the master about it.
-          putBlockInfo.size = size
-          if (tellMaster) {
-            reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
-          }
-          Option(TaskContext.get()).foreach { c =>
-            c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
-          }
-        }
-      } finally {
-        if (blockWasSuccessfullyStored) {
-          if (keepReadLock) {
-            blockInfoManager.downgradeLock(blockId)
-          } else {
-            blockInfoManager.unlock(blockId)
-          }
-        } else {
-          blockInfoManager.removeBlock(blockId)
-          logWarning(s"Putting block $blockId failed")
-        }
-      }
-    }
-    logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
-
-    if (replicationFuture != null) {
-      // Wait for asynchronous replication to finish
-      Await.ready(replicationFuture, Duration.Inf)
-    } else if (putLevel.replication > 1 && blockWasSuccessfullyStored) {
-      val remoteStartTime = System.currentTimeMillis
-      val bytesToReplicate: ByteBuffer = {
-        doGetLocal(blockId, putBlockInfo, asBlockResult = false)
-          .map(_.asInstanceOf[ByteBuffer])
-          .getOrElse {
-            throw new SparkException(s"Block $blockId was not found even 
though it was just stored")
-          }
-      }
-      try {
-        replicate(blockId, bytesToReplicate, putLevel)
-      } finally {
-        BlockManager.dispose(bytesToReplicate)
+      } else {
+        blockInfoManager.removeBlock(blockId)
+        logWarning(s"Putting block $blockId failed")
       }
-      logDebug("Put block %s remotely took %s"
-        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
     }
-
-    if (putLevel.replication > 1) {
+    if (level.replication > 1) {
       logDebug("Putting block %s with replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     } else {
       logDebug("Putting block %s without replication took %s"
         .format(blockId, Utils.getUsedTimeMs(startTimeMs)))
     }
+    result
+  }
 
-    if (blockWasSuccessfullyStored) {
-      DoPutSucceeded
-    } else if (iteratorFromFailedMemoryStorePut.isDefined) {
-      DoPutIteratorFailed(iteratorFromFailedMemoryStorePut.get)
-    } else {
-      DoPutBytesFailed
+  /**
+   * Put the given block according to the given level in one of the block 
stores, replicating
+   * the values if necessary.
+   *
+   * If the block already exists, this method will not overwrite it.
+   *
+   * @param keepReadLock if true, this method will hold the read lock when it 
returns (even if the
+   *                     block already exists). If false, this method will 
hold no locks when it
+   *                     returns.
+   * @return None if the block was already present or if the put succeeded, or 
Some(iterator)
+   *         if the put failed.
+   */
+  private def doPutIterator(
+      blockId: BlockId,
+      iterator: () => Iterator[Any],
+      level: StorageLevel,
+      tellMaster: Boolean = true,
+      keepReadLock: Boolean = false): Option[Iterator[Any]] = {
+    doPut(blockId, level, tellMaster = tellMaster, keepReadLock = 
keepReadLock) { putBlockInfo =>
+      val startTimeMs = System.currentTimeMillis
+      var iteratorFromFailedMemoryStorePut: Option[Iterator[Any]] = None
+      // Size of the block in bytes
+      var size = 0L
+      if (level.useMemory) {
+        // Put it in memory first, even if it also has useDisk set to true;
+        // We will drop it to disk later if the memory store can't hold it.
+        memoryStore.putIterator(blockId, iterator(), level) match {
+          case Right(s) =>
+            size = s
+          case Left(iter) =>
+            // Not enough space to unroll this block; drop to disk if 
applicable
+            if (level.useDisk) {
+              logWarning(s"Persisting block $blockId to disk instead.")
+              diskStore.put(blockId) { fileOutputStream =>
+                dataSerializeStream(blockId, fileOutputStream, iter)
+              }
+              size = diskStore.getSize(blockId)
+            } else {
+              iteratorFromFailedMemoryStorePut = Some(iter)
+            }
+        }
+      } else if (level.useDisk) {
+        diskStore.put(blockId) { fileOutputStream =>
+          dataSerializeStream(blockId, fileOutputStream, iterator())
+        }
+        size = diskStore.getSize(blockId)
+      }
+
+      val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)
+      val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
+      if (blockWasSuccessfullyStored) {
+        // Now that the block is in either the memory, externalBlockStore, or 
disk store,
+        // tell the master about it.
+        putBlockInfo.size = size
+        if (tellMaster) {
+          reportBlockStatus(blockId, putBlockInfo, putBlockStatus)
+        }
+        Option(TaskContext.get()).foreach { c =>
+          c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, 
putBlockStatus)))
+        }
+        logDebug("Put block %s locally took %s".format(blockId, 
Utils.getUsedTimeMs(startTimeMs)))
+        if (level.replication > 1) {
+          val remoteStartTime = System.currentTimeMillis
+          val bytesToReplicate = doGetLocalBytes(blockId, putBlockInfo)
+          try {
+            replicate(blockId, bytesToReplicate, level)
+          } finally {
+            BlockManager.dispose(bytesToReplicate)
+          }
+          logDebug("Put block %s remotely took %s"
+            .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
+        }
+      }
+      assert(blockWasSuccessfullyStored == 
iteratorFromFailedMemoryStorePut.isEmpty)
+      iteratorFromFailedMemoryStorePut
     }
   }
 
@@ -1077,9 +1096,11 @@ private[spark] class BlockManager(
       logInfo(s"Writing block $blockId to disk")
       data() match {
         case Left(elements) =>
-          diskStore.putIterator(blockId, elements.toIterator, level)
+          diskStore.put(blockId) { fileOutputStream =>
+            dataSerializeStream(blockId, fileOutputStream, elements.toIterator)
+          }
         case Right(bytes) =>
-          diskStore.putBytes(blockId, bytes, level)
+          diskStore.putBytes(blockId, bytes)
       }
       blockIsUpdated = true
     }
@@ -1229,7 +1250,6 @@ private[spark] class BlockManager(
     rpcEnv.stop(slaveEndpoint)
     blockInfoManager.clear()
     memoryStore.clear()
-    diskStore.clear()
     futureExecutionContext.shutdownNow()
     logInfo("BlockManager stopped")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
deleted file mode 100644
index b069918..0000000
--- a/core/src/main/scala/org/apache/spark/storage/BlockStore.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.Logging
-
-/**
- * Abstract class to store blocks.
- */
-private[spark] abstract class BlockStore(val blockManager: BlockManager) 
extends Logging {
-
-  def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel): Unit
-
-  /**
-   * Attempt to store an iterator of values.
-   *
-   * @return an iterator of values (in case the put failed), or the estimated 
size of the stored
-   *         values if the put succeeded.
-   */
-  def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long]
-
-  /**
-   * Return the size of a block in bytes.
-   */
-  def getSize(blockId: BlockId): Long
-
-  def getBytes(blockId: BlockId): Option[ByteBuffer]
-
-  def getValues(blockId: BlockId): Option[Iterator[Any]]
-
-  /**
-   * Remove a block, if it exists.
-   *
-   * @param blockId the block to remove.
-   * @return True if the block was found and removed, False otherwise.
-   * @throws IllegalStateException if the block is pinned by a task.
-   */
-  def remove(blockId: BlockId): Boolean
-
-  def contains(blockId: BlockId): Boolean
-
-  def clear() { }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 4daf22f..e51d96e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -26,18 +26,14 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and 
physical on-disk
- * locations. By default, one block is mapped to one file with a name given by 
its BlockId.
- * However, it is also possible to have a block map to only a segment of a 
file, by calling
- * mapBlockToFileSegment().
+ * locations. One block is mapped to one file with a name given by its BlockId.
  *
  * Block files are hashed among the directories listed in spark.local.dir (or 
in
  * SPARK_LOCAL_DIRS, if it's set).
  */
-private[spark] class DiskBlockManager(blockManager: BlockManager, conf: 
SparkConf)
-  extends Logging {
+private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: 
Boolean) extends Logging {
 
-  private[spark]
-  val subDirsPerLocalDir = 
blockManager.conf.getInt("spark.diskStore.subDirectories", 64)
+  private[spark] val subDirsPerLocalDir = 
conf.getInt("spark.diskStore.subDirectories", 64)
 
   /* Create one local directory for each path mentioned in spark.local.dir; 
then, inside this
    * directory, create multiple subdirectories that we will hash files into, 
in order to avoid
@@ -163,10 +159,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   }
 
   private def doStop(): Unit = {
-    // Only perform cleanup if an external service is not serving our shuffle 
files.
-    // Also blockManagerId could be null if block manager is not initialized 
properly.
-    if (!blockManager.externalShuffleServiceEnabled ||
-      (blockManager.blockManagerId != null && 
blockManager.blockManagerId.isDriver)) {
+    if (deleteFilesOnStop) {
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index e35aa1b..caecd97 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -17,112 +17,100 @@
 
 package org.apache.spark.storage
 
-import java.io.{File, FileOutputStream, IOException, RandomAccessFile}
+import java.io.{FileOutputStream, IOException, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import org.apache.spark.Logging
+import com.google.common.io.Closeables
+
+import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.util.Utils
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(blockManager: BlockManager, diskManager: 
DiskBlockManager)
-  extends BlockStore(blockManager) with Logging {
+private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) 
extends Logging {
 
-  val minMemoryMapBytes = 
blockManager.conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
-  override def getSize(blockId: BlockId): Long = {
+  def getSize(blockId: BlockId): Long = {
     diskManager.getFile(blockId.name).length
   }
 
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): Unit = {
-    // So that we do not modify the input offsets !
-    // duplicate does not copy buffer, so inexpensive
-    val bytes = _bytes.duplicate()
+  /**
+   * Invokes the provided callback function to write the specific block.
+   *
+   * @throws IllegalStateException if the block already exists in the disk 
store.
+   */
+  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+    if (contains(blockId)) {
+      throw new IllegalStateException(s"Block $blockId is already present in 
the disk store")
+    }
     logDebug(s"Attempting to put block $blockId")
     val startTime = System.currentTimeMillis
     val file = diskManager.getFile(blockId)
-    val channel = new FileOutputStream(file).getChannel
-    Utils.tryWithSafeFinally {
-      while (bytes.remaining > 0) {
-        channel.write(bytes)
+    val fileOutputStream = new FileOutputStream(file)
+    var threwException: Boolean = true
+    try {
+      writeFunc(fileOutputStream)
+      threwException = false
+    } finally {
+      try {
+        Closeables.close(fileOutputStream, threwException)
+      } finally {
+         if (threwException) {
+          remove(blockId)
+        }
       }
-    } {
-      channel.close()
     }
     val finishTime = System.currentTimeMillis
     logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))
+      file.getName,
+      Utils.bytesToString(file.length()),
+      finishTime - startTime))
   }
 
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Right[Iterator[Any], Long] = {
-    logDebug(s"Attempting to write values for block $blockId")
-    val startTime = System.currentTimeMillis
-    val file = diskManager.getFile(blockId)
-    val outputStream = new FileOutputStream(file)
-    try {
+  def putBytes(blockId: BlockId, _bytes: ByteBuffer): Unit = {
+    // So that we do not modify the input offsets !
+    // duplicate does not copy buffer, so inexpensive
+    val bytes = _bytes.duplicate()
+    put(blockId) { fileOutputStream =>
+      val channel = fileOutputStream.getChannel
       Utils.tryWithSafeFinally {
-        blockManager.dataSerializeStream(blockId, outputStream, values)
+        while (bytes.remaining > 0) {
+          channel.write(bytes)
+        }
       } {
-        // Close outputStream here because it should be closed before file is 
deleted.
-        outputStream.close()
+        channel.close()
       }
-    } catch {
-      case e: Throwable =>
-        if (file.exists()) {
-          if (!file.delete()) {
-            logWarning(s"Error deleting ${file}")
-          }
-        }
-        throw e
     }
-
-    val length = file.length
-
-    val timeTaken = System.currentTimeMillis - startTime
-    logDebug("Block %s stored as %s file on disk in %d ms".format(
-      file.getName, Utils.bytesToString(length), timeTaken))
-
-    Right(length)
   }
 
-  private def getBytes(file: File, offset: Long, length: Long): 
Option[ByteBuffer] = {
+  def getBytes(blockId: BlockId): ByteBuffer = {
+    val file = diskManager.getFile(blockId.name)
     val channel = new RandomAccessFile(file, "r").getChannel
     Utils.tryWithSafeFinally {
       // For small files, directly read rather than memory map
-      if (length < minMemoryMapBytes) {
-        val buf = ByteBuffer.allocate(length.toInt)
-        channel.position(offset)
+      if (file.length < minMemoryMapBytes) {
+        val buf = ByteBuffer.allocate(file.length.toInt)
+        channel.position(0)
         while (buf.remaining() != 0) {
           if (channel.read(buf) == -1) {
             throw new IOException("Reached EOF before filling buffer\n" +
-              
s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+              
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
           }
         }
         buf.flip()
-        Some(buf)
+        buf
       } else {
-        Some(channel.map(MapMode.READ_ONLY, offset, length))
+        channel.map(MapMode.READ_ONLY, 0, file.length)
       }
     } {
       channel.close()
     }
   }
 
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val file = diskManager.getFile(blockId.name)
-    getBytes(file, 0, file.length)
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    getBytes(blockId).map(buffer => blockManager.dataDeserialize(blockId, 
buffer))
-  }
-
-  override def remove(blockId: BlockId): Boolean = {
+  def remove(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     if (file.exists()) {
       val ret = file.delete()
@@ -135,7 +123,7 @@ private[spark] class DiskStore(blockManager: BlockManager, 
diskManager: DiskBloc
     }
   }
 
-  override def contains(blockId: BlockId): Boolean = {
+  def contains(blockId: BlockId): Boolean = {
     val file = diskManager.getFile(blockId.name)
     file.exists()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81d48532/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
deleted file mode 100644
index bb72fe4..0000000
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ /dev/null
@@ -1,566 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.nio.ByteBuffer
-import java.util.LinkedHashMap
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.TaskContext
-import org.apache.spark.memory.MemoryManager
-import org.apache.spark.util.{SizeEstimator, Utils}
-import org.apache.spark.util.collection.SizeTrackingVector
-
-private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
-
-/**
- * Stores blocks in memory, either as Arrays of deserialized Java objects or as
- * serialized ByteBuffers.
- */
-private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: 
MemoryManager)
-  extends BlockStore(blockManager) {
-
-  // Note: all changes to memory allocations, notably putting blocks, evicting 
blocks, and
-  // acquiring or releasing unroll memory, must be synchronized on 
`memoryManager`!
-
-  private val conf = blockManager.conf
-  private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, 
true)
-
-  // A mapping from taskAttemptId to amount of memory used for unrolling a 
block (in bytes)
-  // All accesses of this map are assumed to have manually synchronized on 
`memoryManager`
-  private val unrollMemoryMap = mutable.HashMap[Long, Long]()
-  // Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
-  // Pending unroll memory refers to the intermediate memory occupied by a task
-  // after the unroll but before the actual putting of the block in the cache.
-  // This chunk of memory is expected to be released *as soon as* we finish
-  // caching the corresponding block as opposed to until after the task 
finishes.
-  // This is only used if a block is successfully unrolled in its entirety in
-  // memory (SPARK-4777).
-  private val pendingUnrollMemoryMap = mutable.HashMap[Long, Long]()
-
-  // Initial memory to request before unrolling any block
-  private val unrollMemoryThreshold: Long =
-    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
-
-  /** Total amount of memory available for storage, in bytes. */
-  private def maxMemory: Long = memoryManager.maxStorageMemory
-
-  if (maxMemory < unrollMemoryThreshold) {
-    logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the 
initial memory " +
-      s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to 
store a block in " +
-      s"memory. Please configure Spark with more memory.")
-  }
-
-  logInfo("MemoryStore started with capacity 
%s".format(Utils.bytesToString(maxMemory)))
-
-  /** Total storage memory used including unroll memory, in bytes. */
-  private def memoryUsed: Long = memoryManager.storageMemoryUsed
-
-  /**
-   * Amount of storage memory, in bytes, used for caching blocks.
-   * This does not include memory used for unrolling.
-   */
-  private def blocksMemoryUsed: Long = memoryManager.synchronized {
-    memoryUsed - currentUnrollMemory
-  }
-
-  override def getSize(blockId: BlockId): Long = {
-    entries.synchronized {
-      entries.get(blockId).size
-    }
-  }
-
-  override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: 
StorageLevel): Unit = {
-    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    val bytes = _bytes.duplicate()
-    bytes.rewind()
-    if (level.deserialized) {
-      val values = blockManager.dataDeserialize(blockId, bytes)
-      putIterator(blockId, values, level)
-    } else {
-      tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-    }
-  }
-
-  /**
-   * Use `size` to test if there is enough space in MemoryStore. If so, create 
the ByteBuffer and
-   * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
-   *
-   * The caller should guarantee that `size` is correct.
-   */
-  def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): Unit = 
{
-    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    // Work on a duplicate - since the original input might be used elsewhere.
-    lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
-    val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false)
-    if (putSuccess) {
-      assert(bytes.limit == size)
-    }
-  }
-
-  override def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel): Either[Iterator[Any], Long] = {
-    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    putIterator(blockId, values, level, allowPersistToDisk = true)
-  }
-
-  /**
-   * Attempt to put the given block in memory store.
-   *
-   * There may not be enough space to fully unroll the iterator in memory, in 
which case we
-   * optionally drop the values to disk if
-   *   (1) the block's storage level specifies useDisk, and
-   *   (2) `allowPersistToDisk` is true.
-   *
-   * One scenario in which `allowPersistToDisk` is false is when the 
BlockManager reads a block
-   * back from disk and attempts to cache it in memory. In this case, we 
should not persist the
-   * block back on disk again, as it is already in disk store.
-   */
-  private[storage] def putIterator(
-      blockId: BlockId,
-      values: Iterator[Any],
-      level: StorageLevel,
-      allowPersistToDisk: Boolean): Either[Iterator[Any], Long] = {
-    require(!contains(blockId), s"Block $blockId is already present in the 
MemoryStore")
-    val unrolledValues = unrollSafely(blockId, values)
-    unrolledValues match {
-      case Left(arrayValues) =>
-        // Values are fully unrolled in memory, so store them as an array
-        val size = {
-          if (level.deserialized) {
-            val sizeEstimate = 
SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
-            tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = 
true)
-            sizeEstimate
-          } else {
-            val bytes = blockManager.dataSerialize(blockId, 
arrayValues.iterator)
-            tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
-            bytes.limit()
-          }
-        }
-        Right(size)
-      case Right(iteratorValues) =>
-        // Not enough space to unroll this block; drop to disk if applicable
-        if (level.useDisk && allowPersistToDisk) {
-          logWarning(s"Persisting block $blockId to disk instead.")
-          blockManager.diskStore.putIterator(blockId, iteratorValues, level)
-        } else {
-          Left(iteratorValues)
-        }
-    }
-  }
-
-  override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else if (entry.deserialized) {
-      Some(blockManager.dataSerialize(blockId, 
entry.value.asInstanceOf[Array[Any]].iterator))
-    } else {
-      Some(entry.value.asInstanceOf[ByteBuffer].duplicate()) // Doesn't 
actually copy the data
-    }
-  }
-
-  override def getValues(blockId: BlockId): Option[Iterator[Any]] = {
-    val entry = entries.synchronized {
-      entries.get(blockId)
-    }
-    if (entry == null) {
-      None
-    } else if (entry.deserialized) {
-      Some(entry.value.asInstanceOf[Array[Any]].iterator)
-    } else {
-      val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't 
actually copy data
-      Some(blockManager.dataDeserialize(blockId, buffer))
-    }
-  }
-
-  override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
-    val entry = entries.synchronized {
-      entries.remove(blockId)
-    }
-    if (entry != null) {
-      memoryManager.releaseStorageMemory(entry.size)
-      logDebug(s"Block $blockId of size ${entry.size} dropped " +
-        s"from memory (free ${maxMemory - blocksMemoryUsed})")
-      true
-    } else {
-      false
-    }
-  }
-
-  override def clear(): Unit = memoryManager.synchronized {
-    entries.synchronized {
-      entries.clear()
-    }
-    unrollMemoryMap.clear()
-    pendingUnrollMemoryMap.clear()
-    memoryManager.releaseAllStorageMemory()
-    logInfo("MemoryStore cleared")
-  }
-
-  /**
-   * Unroll the given block in memory safely.
-   *
-   * The safety of this operation refers to avoiding potential OOM exceptions 
caused by
-   * unrolling the entirety of the block in memory at once. This is achieved 
by periodically
-   * checking whether the memory restrictions for unrolling blocks are still 
satisfied,
-   * stopping immediately if not. This check is a safeguard against the 
scenario in which
-   * there is not enough free memory to accommodate the entirety of a single 
block.
-   *
-   * This method returns either an array with the contents of the entire block 
or an iterator
-   * containing the values of the block (if the array would have exceeded 
available memory).
-   */
-  def unrollSafely(blockId: BlockId, values: Iterator[Any]): 
Either[Array[Any], Iterator[Any]] = {
-
-    // Number of elements unrolled so far
-    var elementsUnrolled = 0
-    // Whether there is still enough memory for us to continue unrolling this 
block
-    var keepUnrolling = true
-    // Initial per-task memory to request for unrolling blocks (bytes). 
Exposed for testing.
-    val initialMemoryThreshold = unrollMemoryThreshold
-    // How often to check whether we need to request more memory
-    val memoryCheckPeriod = 16
-    // Memory currently reserved by this task for this particular unrolling 
operation
-    var memoryThreshold = initialMemoryThreshold
-    // Memory to request as a multiple of current vector size
-    val memoryGrowthFactor = 1.5
-    // Keep track of pending unroll memory reserved by this method.
-    var pendingMemoryReserved = 0L
-    // Underlying vector for unrolling the block
-    var vector = new SizeTrackingVector[Any]
-
-    // Request enough memory to begin unrolling
-    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
initialMemoryThreshold)
-
-    if (!keepUnrolling) {
-      logWarning(s"Failed to reserve initial memory threshold of " +
-        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block 
$blockId in memory.")
-    } else {
-      pendingMemoryReserved += initialMemoryThreshold
-    }
-
-    // Unroll this block safely, checking whether we have exceeded our 
threshold periodically
-    try {
-      while (values.hasNext && keepUnrolling) {
-        vector += values.next()
-        if (elementsUnrolled % memoryCheckPeriod == 0) {
-          // If our vector's size has exceeded the threshold, request more 
memory
-          val currentSize = vector.estimateSize()
-          if (currentSize >= memoryThreshold) {
-            val amountToRequest = (currentSize * memoryGrowthFactor - 
memoryThreshold).toLong
-            keepUnrolling = reserveUnrollMemoryForThisTask(blockId, 
amountToRequest)
-            if (keepUnrolling) {
-              pendingMemoryReserved += amountToRequest
-            }
-            // New threshold is currentSize * memoryGrowthFactor
-            memoryThreshold += amountToRequest
-          }
-        }
-        elementsUnrolled += 1
-      }
-
-      if (keepUnrolling) {
-        // We successfully unrolled the entirety of this block
-        Left(vector.toArray)
-      } else {
-        // We ran out of space while unrolling the values for this block
-        logUnrollFailureMessage(blockId, vector.estimateSize())
-        Right(vector.iterator ++ values)
-      }
-
-    } finally {
-      // If we return an array, the values returned here will be cached in 
`tryToPut` later.
-      // In this case, we should release the memory only after we cache the 
block there.
-      if (keepUnrolling) {
-        val taskAttemptId = currentTaskAttemptId()
-        memoryManager.synchronized {
-          // Since we continue to hold onto the array until we actually cache 
it, we cannot
-          // release the unroll memory yet. Instead, we transfer it to pending 
unroll memory
-          // so `tryToPut` can further transfer it to normal storage memory 
later.
-          // TODO: we can probably express this without pending unroll memory 
(SPARK-10907)
-          unrollMemoryMap(taskAttemptId) -= pendingMemoryReserved
-          pendingUnrollMemoryMap(taskAttemptId) =
-            pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + 
pendingMemoryReserved
-        }
-      } else {
-        // Otherwise, if we return an iterator, we can only release the unroll 
memory when
-        // the task finishes since we don't know when the iterator will be 
consumed.
-      }
-    }
-  }
-
-  /**
-   * Return the RDD ID that a given block ID is from, or None if it is not an 
RDD block.
-   */
-  private def getRddId(blockId: BlockId): Option[Int] = {
-    blockId.asRDDId.map(_.rddId)
-  }
-
-  /**
-   * Try to put in a set of values, if we can free up enough space. The value 
should either be
-   * an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly 
estimated) size
-   * must also be passed by the caller.
-   *
-   * `value` will be lazily created. If it cannot be put into MemoryStore or 
disk, `value` won't be
-   * created to avoid OOM since it may be a big ByteBuffer.
-   *
-   * Synchronize on `memoryManager` to ensure that all the put requests and 
its associated block
-   * dropping is done by only on thread at a time. Otherwise while one thread 
is dropping
-   * blocks to free memory for one block, another thread may use up the freed 
space for
-   * another block.
-   *
-   * @return whether put was successful.
-   */
-  private def tryToPut(
-      blockId: BlockId,
-      value: () => Any,
-      size: Long,
-      deserialized: Boolean): Boolean = {
-
-    /* TODO: Its possible to optimize the locking by locking entries only when 
selecting blocks
-     * to be dropped. Once the to-be-dropped blocks have been selected, and 
lock on entries has
-     * been released, it must be ensured that those to-be-dropped blocks are 
not double counted
-     * for freeing up more space for another block that needs to be put. Only 
then the actually
-     * dropping of blocks (and writing to disk if necessary) can proceed in 
parallel. */
-
-    memoryManager.synchronized {
-      // Note: if we have previously unrolled this block successfully, then 
pending unroll
-      // memory should be non-zero. This is the amount that we already 
reserved during the
-      // unrolling process. In this case, we can just reuse this space to 
cache our block.
-      // The synchronization on `memoryManager` here guarantees that the 
release and acquire
-      // happen atomically. This relies on the assumption that all memory 
acquisitions are
-      // synchronized on the same lock.
-      releasePendingUnrollMemoryForThisTask()
-      val enoughMemory = memoryManager.acquireStorageMemory(blockId, size)
-      if (enoughMemory) {
-        // We acquired enough memory for the block, so go ahead and put it
-        val entry = new MemoryEntry(value(), size, deserialized)
-        entries.synchronized {
-          entries.put(blockId, entry)
-        }
-        val valuesOrBytes = if (deserialized) "values" else "bytes"
-        logInfo("Block %s stored as %s in memory (estimated size %s, free 
%s)".format(
-          blockId, valuesOrBytes, Utils.bytesToString(size), 
Utils.bytesToString(blocksMemoryUsed)))
-      } else {
-        // Tell the block manager that we couldn't put it in memory so that it 
can drop it to
-        // disk if the block allows disk storage.
-        lazy val data = if (deserialized) {
-          Left(value().asInstanceOf[Array[Any]])
-        } else {
-          Right(value().asInstanceOf[ByteBuffer].duplicate())
-        }
-        blockManager.dropFromMemory(blockId, () => data)
-      }
-      enoughMemory
-    }
-  }
-
-  /**
-    * Try to evict blocks to free up a given amount of space to store a 
particular block.
-    * Can fail if either the block is bigger than our memory or it would 
require replacing
-    * another block from the same RDD (which leads to a wasteful cyclic 
replacement pattern for
-    * RDDs that don't fit into memory that we want to avoid).
-    *
-    * @param blockId the ID of the block we are freeing space for, if any
-    * @param space the size of this block
-    * @return the amount of memory (in bytes) freed by eviction
-    */
-  private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: 
Long): Long = {
-    assert(space > 0)
-    memoryManager.synchronized {
-      var freedMemory = 0L
-      val rddToAdd = blockId.flatMap(getRddId)
-      val selectedBlocks = new ArrayBuffer[BlockId]
-      def blockIsEvictable(blockId: BlockId): Boolean = {
-        rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
-      }
-      // This is synchronized to ensure that the set of entries is not changed
-      // (because of getValue or getBytes) while traversing the iterator, as 
that
-      // can lead to exceptions.
-      entries.synchronized {
-        val iterator = entries.entrySet().iterator()
-        while (freedMemory < space && iterator.hasNext) {
-          val pair = iterator.next()
-          val blockId = pair.getKey
-          if (blockIsEvictable(blockId)) {
-            // We don't want to evict blocks which are currently being read, 
so we need to obtain
-            // an exclusive write lock on blocks which are candidates for 
eviction. We perform a
-            // non-blocking "tryLock" here in order to ignore blocks which are 
locked for reading:
-            if (blockManager.blockInfoManager.lockForWriting(blockId, blocking 
= false).isDefined) {
-              selectedBlocks += blockId
-              freedMemory += pair.getValue.size
-            }
-          }
-        }
-      }
-
-      if (freedMemory >= space) {
-        logInfo(s"${selectedBlocks.size} blocks selected for dropping")
-        for (blockId <- selectedBlocks) {
-          val entry = entries.synchronized { entries.get(blockId) }
-          // This should never be null as only one task should be dropping
-          // blocks and removing entries. However the check is still here for
-          // future safety.
-          if (entry != null) {
-            val data = if (entry.deserialized) {
-              Left(entry.value.asInstanceOf[Array[Any]])
-            } else {
-              Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
-            }
-            val newEffectiveStorageLevel = 
blockManager.dropFromMemory(blockId, () => data)
-            if (newEffectiveStorageLevel.isValid) {
-              // The block is still present in at least one store, so release 
the lock
-              // but don't delete the block info
-              blockManager.releaseLock(blockId)
-            } else {
-              // The block isn't present in any store, so delete the block 
info so that the
-              // block can be stored again
-              blockManager.blockInfoManager.removeBlock(blockId)
-            }
-          }
-        }
-        freedMemory
-      } else {
-        blockId.foreach { id =>
-          logInfo(s"Will not store $id as it would require dropping another 
block " +
-            "from the same RDD")
-        }
-        selectedBlocks.foreach { id =>
-          blockManager.releaseLock(id)
-        }
-        0L
-      }
-    }
-  }
-
-  override def contains(blockId: BlockId): Boolean = {
-    entries.synchronized { entries.containsKey(blockId) }
-  }
-
-  private def currentTaskAttemptId(): Long = {
-    // In case this is called on the driver, return an invalid task attempt id.
-    Option(TaskContext.get()).map(_.taskAttemptId()).getOrElse(-1L)
-  }
-
-  /**
-   * Reserve memory for unrolling the given block for this task.
-   *
-   * @return whether the request is granted.
-   */
-  def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean 
= {
-    memoryManager.synchronized {
-      val success = memoryManager.acquireUnrollMemory(blockId, memory)
-      if (success) {
-        val taskAttemptId = currentTaskAttemptId()
-        unrollMemoryMap(taskAttemptId) = 
unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
-      }
-      success
-    }
-  }
-
-  /**
-   * Release memory used by this task for unrolling blocks.
-   * If the amount is not specified, remove the current task's allocation 
altogether.
-   */
-  def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
-    val taskAttemptId = currentTaskAttemptId()
-    memoryManager.synchronized {
-      if (unrollMemoryMap.contains(taskAttemptId)) {
-        val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
-        if (memoryToRelease > 0) {
-          unrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (unrollMemoryMap(taskAttemptId) == 0) {
-            unrollMemoryMap.remove(taskAttemptId)
-          }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
-        }
-      }
-    }
-  }
-
-  /**
-   * Release pending unroll memory of current unroll successful block used by 
this task
-   */
-  def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): 
Unit = {
-    val taskAttemptId = currentTaskAttemptId()
-    memoryManager.synchronized {
-      if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
-        val memoryToRelease = math.min(memory, 
pendingUnrollMemoryMap(taskAttemptId))
-        if (memoryToRelease > 0) {
-          pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
-            pendingUnrollMemoryMap.remove(taskAttemptId)
-          }
-          memoryManager.releaseUnrollMemory(memoryToRelease)
-        }
-      }
-    }
-  }
-
-  /**
-   * Return the amount of memory currently occupied for unrolling blocks 
across all tasks.
-   */
-  def currentUnrollMemory: Long = memoryManager.synchronized {
-    unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
-  }
-
-  /**
-   * Return the amount of memory currently occupied for unrolling blocks by 
this task.
-   */
-  def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
-    unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
-  }
-
-  /**
-   * Return the number of tasks currently unrolling blocks.
-   */
-  private def numTasksUnrolling: Int = memoryManager.synchronized { 
unrollMemoryMap.keys.size }
-
-  /**
-   * Log information about current memory usage.
-   */
-  private def logMemoryUsage(): Unit = {
-    logInfo(
-      s"Memory use = ${Utils.bytesToString(blocksMemoryUsed)} (blocks) + " +
-      s"${Utils.bytesToString(currentUnrollMemory)} (scratch space shared 
across " +
-      s"$numTasksUnrolling tasks(s)) = ${Utils.bytesToString(memoryUsed)}. " +
-      s"Storage limit = ${Utils.bytesToString(maxMemory)}."
-    )
-  }
-
-  /**
-   * Log a warning for failing to unroll a block.
-   *
-   * @param blockId ID of the block we are trying to unroll.
-   * @param finalVectorSize Final size of the vector before unrolling failed.
-   */
-  private def logUnrollFailureMessage(blockId: BlockId, finalVectorSize: 
Long): Unit = {
-    logWarning(
-      s"Not enough space to cache $blockId in memory! " +
-      s"(computed ${Utils.bytesToString(finalVectorSize)} so far)"
-    )
-    logMemoryUsage()
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to