srowen closed pull request #23457: [SPARK-26539][CORE] Remove 
spark.memory.useLegacyMode and StaticMemoryManager
URL: https://github.com/apache/spark/pull/23457
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 681e4378a4dd5..f27df505fa5ff 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -532,38 +532,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     }
 
     // Validate memory fractions
-    val deprecatedMemoryKeys = Seq(
-      "spark.storage.memoryFraction",
-      "spark.shuffle.memoryFraction",
-      "spark.shuffle.safetyFraction",
-      "spark.storage.unrollFraction",
-      "spark.storage.safetyFraction")
-    val memoryKeys = Seq(
-      "spark.memory.fraction",
-      "spark.memory.storageFraction") ++
-      deprecatedMemoryKeys
-    for (key <- memoryKeys) {
+    for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) {
       val value = getDouble(key, 0.5)
       if (value > 1 || value < 0) {
         throw new IllegalArgumentException(s"$key should be between 0 and 1 
(was '$value').")
       }
     }
 
-    // Warn against deprecated memory fractions (unless legacy memory 
management mode is enabled)
-    val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
-    val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
-    if (!legacyMemoryManagement) {
-      val keyset = deprecatedMemoryKeys.toSet
-      val detected = settings.keys().asScala.filter(keyset.contains)
-      if (detected.nonEmpty) {
-        logWarning("Detected deprecated memory fraction settings: " +
-          detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution 
and storage " +
-          "memory management are unified. All memory fractions used in the old 
model are " +
-          "now deprecated and no longer read. If you wish to use the old 
memory management, " +
-          s"you may explicitly enable `$legacyMemoryManagementKey` (not 
recommended).")
-      }
-    }
-
     if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
       val warning = s"spark.master ${get("spark.master")} is deprecated in 
Spark 2.0+, please " +
         "instead use \"yarn\" with specified deploy mode."
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 9222781fa0833..ba5ed8ab1f302 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, 
UnifiedMemoryManager}
+import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -322,13 +322,7 @@ object SparkEnv extends Logging {
       shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), 
shuffleMgrName)
     val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
 
-    val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", 
false)
-    val memoryManager: MemoryManager =
-      if (useLegacyMemoryManager) {
-        new StaticMemoryManager(conf, numUsableCores)
-      } else {
-        UnifiedMemoryManager(conf, numUsableCores)
-      }
+    val memoryManager: MemoryManager = UnifiedMemoryManager(conf, 
numUsableCores)
 
     val blockManagerPort = if (isDriver) {
       conf.get(DRIVER_BLOCK_MANAGER_PORT)
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
deleted file mode 100644
index 7e052c02c9376..0000000000000
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.BlockId
-
-/**
- * A [[MemoryManager]] that statically partitions the heap space into disjoint 
regions.
- *
- * The sizes of the execution and storage regions are determined through
- * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` 
respectively. The two
- * regions are cleanly separated such that neither usage can borrow memory 
from the other.
- */
-private[spark] class StaticMemoryManager(
-    conf: SparkConf,
-    maxOnHeapExecutionMemory: Long,
-    override val maxOnHeapStorageMemory: Long,
-    numCores: Int)
-  extends MemoryManager(
-    conf,
-    numCores,
-    maxOnHeapStorageMemory,
-    maxOnHeapExecutionMemory) {
-
-  def this(conf: SparkConf, numCores: Int) {
-    this(
-      conf,
-      StaticMemoryManager.getMaxExecutionMemory(conf),
-      StaticMemoryManager.getMaxStorageMemory(conf),
-      numCores)
-  }
-
-  // The StaticMemoryManager does not support off-heap storage memory:
-  
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
-  offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
-
-  // Max number of bytes worth of blocks to evict when unrolling
-  private val maxUnrollMemory: Long = {
-    (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 
0.2)).toLong
-  }
-
-  override def maxOffHeapStorageMemory: Long = 0L
-
-  override def acquireStorageMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      memoryMode: MemoryMode): Boolean = synchronized {
-    require(memoryMode != MemoryMode.OFF_HEAP,
-      "StaticMemoryManager does not support off-heap storage memory")
-    if (numBytes > maxOnHeapStorageMemory) {
-      // Fail fast if the block simply won't fit
-      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
-        s"memory limit ($maxOnHeapStorageMemory bytes)")
-      false
-    } else {
-      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
-    }
-  }
-
-  override def acquireUnrollMemory(
-      blockId: BlockId,
-      numBytes: Long,
-      memoryMode: MemoryMode): Boolean = synchronized {
-    require(memoryMode != MemoryMode.OFF_HEAP,
-      "StaticMemoryManager does not support off-heap unroll memory")
-    if (numBytes > maxOnHeapStorageMemory) {
-      // Fail fast if the block simply won't fit
-      logInfo(s"Will not store $blockId as the required space ($numBytes 
bytes) exceeds our " +
-        s"memory limit ($maxOnHeapStorageMemory bytes)")
-      false
-    } else {
-      val currentUnrollMemory = 
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
-      val freeMemory = onHeapStorageMemoryPool.memoryFree
-      // When unrolling, we will use all of the existing free memory, and, if 
necessary,
-      // some extra space freed from evicting cached blocks. We must place a 
cap on the
-      // amount of memory to be evicted by unrolling, however, otherwise 
unrolling one
-      // big block can blow away the entire cache.
-      val maxNumBytesToFree = math.max(0, maxUnrollMemory - 
currentUnrollMemory - freeMemory)
-      // Keep it within the range 0 <= X <= maxNumBytesToFree
-      val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - 
freeMemory))
-      onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
-    }
-  }
-
-  private[memory]
-  override def acquireExecutionMemory(
-      numBytes: Long,
-      taskAttemptId: Long,
-      memoryMode: MemoryMode): Long = synchronized {
-    memoryMode match {
-      case MemoryMode.ON_HEAP => 
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-      case MemoryMode.OFF_HEAP => 
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
-    }
-  }
-}
-
-
-private[spark] object StaticMemoryManager {
-
-  private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
-
-  /**
-   * Return the total amount of memory available for the storage region, in 
bytes.
-   */
-  private def getMaxStorageMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.get(TEST_MEMORY)
-    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
-    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
-    (systemMaxMemory * memoryFraction * safetyFraction).toLong
-  }
-
-  /**
-   * Return the total amount of memory available for the execution region, in 
bytes.
-   */
-  private def getMaxExecutionMemory(conf: SparkConf): Long = {
-    val systemMaxMemory = conf.get(TEST_MEMORY)
-
-    if (systemMaxMemory < MIN_MEMORY_BYTES) {
-      throw new IllegalArgumentException(s"System memory $systemMaxMemory must 
" +
-        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the 
--driver-memory " +
-        s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
-    }
-    if (conf.contains(config.EXECUTOR_MEMORY)) {
-      val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
-      if (executorMemory < MIN_MEMORY_BYTES) {
-        throw new IllegalArgumentException(s"Executor memory $executorMemory 
must be at least " +
-          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
-          s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark 
configuration.")
-      }
-    }
-    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
-    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
-    (systemMaxMemory * memoryFraction * safetyFraction).toLong
-  }
-
-}
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 7801bb87050f6..a0fbbbdebd028 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -46,7 +46,7 @@ import org.apache.spark.storage.BlockId
  *                          it if necessary. Cached blocks can be evicted only 
if actual
  *                          storage memory usage exceeds this region.
  */
-private[spark] class UnifiedMemoryManager private[memory] (
+private[spark] class UnifiedMemoryManager(
     conf: SparkConf,
     val maxHeapMemory: Long,
     onHeapStorageRegionSize: Long,
diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala 
b/core/src/main/scala/org/apache/spark/memory/package.scala
index 3d00cd9cb6377..7f782193f246f 100644
--- a/core/src/main/scala/org/apache/spark/memory/package.scala
+++ b/core/src/main/scala/org/apache/spark/memory/package.scala
@@ -61,15 +61,10 @@ package org.apache.spark
  * }}}
  *
  *
- * There are two implementations of [[org.apache.spark.memory.MemoryManager]] 
which vary in how
- * they handle the sizing of their memory pools:
+ * There is one implementation of [[org.apache.spark.memory.MemoryManager]]:
  *
- *  - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 
1.6+, enforces soft
+ *  - [[org.apache.spark.memory.UnifiedMemoryManager]] enforces soft
  *    boundaries between storage and execution memory, allowing requests for 
memory in one region
  *    to be fulfilled by borrowing memory from the other.
- *  - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries 
between storage
- *    and execution memory by statically partitioning Spark's memory and 
preventing storage and
- *    execution from borrowing memory from each other. This mode is retained 
only for legacy
- *    compatibility purposes.
  */
 package object memory
diff --git 
a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java 
b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index a0664b30d6cc2..dc1fe774f7961 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -29,7 +29,7 @@
   @Test
   public void leakedPageMemoryIsDetected() {
     final TaskMemoryManager manager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set("spark.memory.offHeap.enabled", "false"),
         Long.MAX_VALUE,
         Long.MAX_VALUE,
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 9ad2e9a5e74ac..6976464e8ab5d 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -145,8 +145,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
   encryptionTest("Cache broadcast to disk") { conf =>
     conf.setMaster("local")
       .setAppName("test")
-      .set("spark.memory.useLegacyMode", "true")
-      .set("spark.storage.memoryFraction", "0.0")
+      .set("spark.memory.storageFraction", "0.0")
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
@@ -173,8 +172,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
     val conf = new SparkConf()
       .setMaster("local[4]")
       .setAppName("test")
-      .set("spark.memory.useLegacyMode", "true")
-      .set("spark.storage.memoryFraction", "0.0")
+      .set("spark.memory.storageFraction", "0.0")
 
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
diff --git 
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
deleted file mode 100644
index c3275add50f48..0000000000000
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.mockito.Mockito.when
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.TestBlockId
-import org.apache.spark.storage.memory.MemoryStore
-
-class StaticMemoryManagerSuite extends MemoryManagerSuite {
-  private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
-
-  /**
-   * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class 
dependencies.
-   */
-  private def makeThings(
-      maxExecutionMem: Long,
-      maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
-    val mm = new StaticMemoryManager(
-      conf,
-      maxOnHeapExecutionMemory = maxExecutionMem,
-      maxOnHeapStorageMemory = maxStorageMem,
-      numCores = 1)
-    val ms = makeMemoryStore(mm)
-    (mm, ms)
-  }
-
-  override protected def createMemoryManager(
-      maxOnHeapExecutionMemory: Long,
-      maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
-    new StaticMemoryManager(
-      conf.clone
-        .set("spark.memory.fraction", "1")
-        .set(TEST_MEMORY, maxOnHeapExecutionMemory)
-        .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
-      maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
-      maxOnHeapStorageMemory = 0,
-      numCores = 1)
-  }
-
-  test("basic execution memory") {
-    val maxExecutionMem = 1000L
-    val taskAttemptId = 0L
-    val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.executionMemoryUsed === 0L)
-    assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
-    assert(mm.executionMemoryUsed === 10L)
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
-    // Acquire up to the max
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 
890L)
-    assert(mm.executionMemoryUsed === maxExecutionMem)
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
-    assert(mm.executionMemoryUsed === maxExecutionMem)
-    mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
-    assert(mm.executionMemoryUsed === 200L)
-    // Acquire after release
-    assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
-    assert(mm.executionMemoryUsed === 201L)
-    // Release beyond what was acquired
-    mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode)
-    assert(mm.executionMemoryUsed === 0L)
-  }
-
-  test("basic storage memory") {
-    val maxStorageMem = 1000L
-    val dummyBlock = TestBlockId("you can see the world you brought to live")
-    val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 10L)
-
-    assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 110L)
-    // Acquire more than the max, not granted
-    assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, 
memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 110L)
-    // Acquire up to the max, requests after this are still granted due to LRU 
eviction
-    assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 110L)
-    assert(mm.storageMemoryUsed === 1000L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 1L)
-    assert(evictedBlocks.nonEmpty)
-    evictedBlocks.clear()
-    // Note: We evicted 1 byte to put another 1-byte block in, so the storage 
memory used remains at
-    // 1000 bytes. This is different from real behavior, where the 1-byte 
block would have evicted
-    // the 1000-byte block entirely. This is set up differently so we can 
write finer-grained tests.
-    assert(mm.storageMemoryUsed === 1000L)
-    mm.releaseStorageMemory(800L, memoryMode)
-    assert(mm.storageMemoryUsed === 200L)
-    // Acquire after release
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 201L)
-    mm.releaseAllStorageMemory()
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 1L)
-    // Release beyond what was acquired
-    mm.releaseStorageMemory(100L, memoryMode)
-    assert(mm.storageMemoryUsed === 0L)
-  }
-
-  test("execution and storage isolation") {
-    val maxExecutionMem = 200L
-    val maxStorageMem = 1000L
-    val taskAttemptId = 0L
-    val dummyBlock = TestBlockId("ain't nobody love like you do")
-    val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    // Only execution memory should increase
-    assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 100L)
-    assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) === 
100L)
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 200L)
-    // Only storage memory should increase
-    assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 50L)
-    assert(mm.executionMemoryUsed === 200L)
-    // Only execution memory should be released
-    mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode)
-    assert(mm.storageMemoryUsed === 50L)
-    assert(mm.executionMemoryUsed === 67L)
-    // Only storage memory should be released
-    mm.releaseAllStorageMemory()
-    assert(mm.storageMemoryUsed === 0L)
-    assert(mm.executionMemoryUsed === 67L)
-  }
-
-  test("unroll memory") {
-    val maxStorageMem = 1000L
-    val dummyBlock = TestBlockId("lonely water")
-    val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
-    val memoryMode = MemoryMode.ON_HEAP
-    assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode))
-    when(ms.currentUnrollMemory).thenReturn(100L)
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 100L)
-    mm.releaseUnrollMemory(40L, memoryMode)
-    assert(mm.storageMemoryUsed === 60L)
-    when(ms.currentUnrollMemory).thenReturn(60L)
-    assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
-    assertEvictBlocksToFreeSpaceNotCalled(ms)
-    assert(mm.storageMemoryUsed === 860L)
-    // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 
bytes.
-    // As of this point, cache memory is 800 bytes and current unroll memory 
is 60 bytes.
-    // Requesting 240 more bytes of unroll memory will leave our total unroll 
memory at
-    // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are 
granted.
-    assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
-    when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
-    assert(mm.storageMemoryUsed === 1000L)
-    evictedBlocks.clear()
-    // We already have 300 bytes of unroll memory, so requesting 150 more will 
leave us
-    // above the 400-byte limit. Since there is not enough free memory, this 
request will
-    // fail even after evicting as much as we can (400 - 300 = 100 bytes).
-    assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode))
-    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
-    assert(mm.storageMemoryUsed === 900L)
-    // Release beyond what was acquired
-    mm.releaseUnrollMemory(maxStorageMem, memoryMode)
-    assert(mm.storageMemoryUsed === 0L)
-  }
-
-}
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 480e07fb9399a..2250ae2f771ed 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -91,7 +91,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     conf.set(IS_TESTING, true)
     conf.set("spark.memory.fraction", "1")
     conf.set("spark.memory.storageFraction", "1")
-    conf.set("spark.storage.unrollFraction", "0.4")
     conf.set("spark.storage.unrollMemoryThreshold", "512")
 
     // to make a replication attempt to inactive store fail fast
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bda81365b0792..c23264191e124 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -120,7 +120,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       .set("spark.memory.fraction", "1")
       .set("spark.memory.storageFraction", "1")
       .set("spark.kryoserializer.buffer", "1m")
-      .set("spark.storage.unrollFraction", "0.4")
       .set("spark.storage.unrollMemoryThreshold", "512")
 
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b02af2bfe7acc..7cdcd0fea2ed4 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import org.scalatest._
 
 import org.apache.spark._
-import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, 
PartiallySerializedBlock, PartiallyUnrolledIterator}
 import org.apache.spark.util._
@@ -39,7 +39,6 @@ class MemoryStoreSuite
   with ResetSystemProperties {
 
   var conf: SparkConf = new SparkConf(false)
-    .set("spark.storage.unrollFraction", "0.4")
     .set("spark.storage.unrollMemoryThreshold", "512")
 
   // Reuse a serializer across tests to avoid creating a new thread-local 
buffer on each test
@@ -60,7 +59,7 @@ class MemoryStoreSuite
   }
 
   def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
+    val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
     val blockInfoManager = new BlockInfoManager
     val blockEvictionHandler = new BlockEvictionHandler {
       var memoryStore: MemoryStore = _
@@ -239,7 +238,7 @@ class MemoryStoreSuite
   }
 
   test("safely unroll blocks through putIteratorAsBytes") {
-    val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+    val (memoryStore, blockInfoManager) = makeMemoryStore(8400)
     val smallList = List.fill(40)(new Array[Byte](100))
     val bigList = List.fill(40)(new Array[Byte](1000))
     def smallIterator: Iterator[Any] = 
smallList.iterator.asInstanceOf[Iterator[Any]]
@@ -290,11 +289,11 @@ class MemoryStoreSuite
     blockInfoManager.removeBlock("b3")
     putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
 
-    // Unroll huge block with not enough space. This should fail.
+    // Unroll huge block with not enough space. This should fail and kick out 
b2 in the process.
     val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
     assert(result4.isLeft) // unroll was unsuccessful
     assert(!memoryStore.contains("b1"))
-    assert(memoryStore.contains("b2"))
+    assert(!memoryStore.contains("b2"))
     assert(memoryStore.contains("b3"))
     assert(!memoryStore.contains("b4"))
     assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an 
iterator
@@ -417,7 +416,7 @@ class MemoryStoreSuite
     val bytesPerSmallBlock = memStoreSize / numInitialBlocks
     def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean): 
Unit = {
       val tc = TaskContext.empty()
-      val memManager = new StaticMemoryManager(conf, Long.MaxValue, 
memStoreSize, numCores = 1)
+      val memManager = new UnifiedMemoryManager(conf, memStoreSize, 
memStoreSize.toInt, 1)
       val blockInfoManager = new BlockInfoManager
       blockInfoManager.registerTask(tc.taskAttemptId)
       var droppedSoFar = 0
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 6211399005e1a..1e0399809ba87 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -551,12 +551,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
 
   test("force to spill for external aggregation") {
     val conf = createSparkConf(loadDefaults = false)
-      .set("spark.shuffle.memoryFraction", "0.01")
-      .set("spark.memory.useLegacyMode", "true")
-      .set(TEST_MEMORY, 100000000L)
+      .set("spark.memory.storageFraction", "0.999")
+      .set(TEST_MEMORY, 471859200L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
-    val N = 2e5.toInt
+    val N = 200000
     sc.parallelize(1 to N, 2)
       .map { i => (i, i) }
       .groupByKey()
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index aa400dd74e9ca..14148e0e67fa7 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -638,12 +638,11 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("force to spill for external sorter") {
     val conf = createSparkConf(loadDefaults = false, kryo = false)
-      .set("spark.shuffle.memoryFraction", "0.01")
-      .set("spark.memory.useLegacyMode", "true")
-      .set(TEST_MEMORY, 100000000L)
+      .set("spark.memory.storageFraction", "0.999")
+      .set(TEST_MEMORY, 471859200L)
       .set("spark.shuffle.sort.bypassMergeThreshold", "0")
     sc = new SparkContext("local", "test", conf)
-    val N = 2e5.toInt
+    val N = 200000
     val p = new org.apache.spark.HashPartitioner(2)
     val p2 = new org.apache.spark.HashPartitioner(3)
     sc.parallelize(1 to N, 3)
diff --git a/docs/configuration.md b/docs/configuration.md
index ff9b802617f08..efd74c3add5e2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1198,51 +1198,6 @@ Apart from these, the following properties are also 
available, and may be useful
     This must be set to a positive value when 
<code>spark.memory.offHeap.enabled=true</code>.
   </td>
 </tr>
-<tr>
-  <td><code>spark.memory.useLegacyMode</code></td>
-  <td>false</td>
-  <td>
-    Whether to enable the legacy memory management mode used in Spark 1.5 and 
before.
-    The legacy mode rigidly partitions the heap space into fixed-size regions,
-    potentially leading to excessive spilling if the application was not tuned.
-    The following deprecated memory fraction configurations are not read 
unless this is enabled:
-    <code>spark.shuffle.memoryFraction</code><br>
-    <code>spark.storage.memoryFraction</code><br>
-    <code>spark.storage.unrollFraction</code>
-  </td>
-</tr>
-<tr>
-  <td><code>spark.shuffle.memoryFraction</code></td>
-  <td>0.2</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
-    Fraction of Java heap to use for aggregation and cogroups during shuffles.
-    At any given time, the collective size of
-    all in-memory maps used for shuffles is bounded by this limit, beyond 
which the contents will
-    begin to spill to disk. If spills are often, consider increasing this 
value at the expense of
-    <code>spark.storage.memoryFraction</code>.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.storage.memoryFraction</code></td>
-  <td>0.6</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
-    Fraction of Java heap to use for Spark's memory cache. This should not be 
larger than the "old"
-    generation of objects in the JVM, which by default is given 0.6 of the 
heap, but you can
-    increase it if you configure your own old generation size.
-  </td>
-</tr>
-<tr>
-  <td><code>spark.storage.unrollFraction</code></td>
-  <td>0.2</td>
-  <td>
-    (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> 
is enabled.
-    Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling 
blocks in memory.
-    This is dynamically allocated by dropping existing blocks when there is 
not enough free
-    storage space to unroll the new block in its entirety.
-  </td>
-</tr>
 <tr>
   <td><code>spark.storage.replication.proactive</code></td>
   <td>false</td>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 7c21062c4cec3..a708926dd1f85 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -94,7 +94,7 @@ private[execution] object HashedRelation {
       taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
     val mm = Option(taskMemoryManager).getOrElse {
       new TaskMemoryManager(
-        new StaticMemoryManager(
+        new UnifiedMemoryManager(
           new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
@@ -227,7 +227,7 @@ private[joins] class UnsafeHashedRelation(
     // TODO(josh): This needs to be revisited before we merge this patch; 
making this change now
     // so that tests compile:
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -392,7 +392,7 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   def this() = {
     this(
       new TaskMemoryManager(
-        new StaticMemoryManager(
+        new UnifiedMemoryManager(
           new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
           Long.MaxValue,
           Long.MaxValue,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index b7d28988274bf..334f0275d4ebf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -22,7 +22,7 @@ import java.util.HashMap
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
 import org.apache.spark.sql.execution.vectorized.AggregateHashMap
@@ -473,7 +473,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
           value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
           value.setInt(0, 555)
           val taskMemoryManager = new TaskMemoryManager(
-            new StaticMemoryManager(
+            new UnifiedMemoryManager(
               new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
               Long.MaxValue,
               Long.MaxValue,
@@ -504,7 +504,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
       Seq("off", "on").foreach { heap =>
         benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ =>
           val taskMemoryManager = new TaskMemoryManager(
-            new StaticMemoryManager(
+            new UnifiedMemoryManager(
               new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap == 
"off"}")
                 .set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
               Long.MaxValue,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
index bdf753debe62a..0b356a9e34c58 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
 import org.apache.spark.SparkConf
 import org.apache.spark.benchmark.Benchmark
 import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{BoundReference, 
UnsafeProjection}
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
@@ -43,7 +43,7 @@ object HashedRelationMetricsBenchmark extends 
SqlBasedBenchmark {
       val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows, 
output = output)
       benchmark.addCase("LongToUnsafeRowMap") { iter =>
         val taskMemoryManager = new TaskMemoryManager(
-          new StaticMemoryManager(
+          new UnifiedMemoryManager(
             new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
             Long.MaxValue,
             Long.MaxValue,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index d9b34dcd16476..7b55e839e3b4c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -23,7 +23,7 @@ import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -36,7 +36,7 @@ import org.apache.spark.util.collection.CompactBuffer
 class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
 
   val mm = new TaskMemoryManager(
-    new StaticMemoryManager(
+    new UnifiedMemoryManager(
       new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
       Long.MaxValue,
       Long.MaxValue,
@@ -85,7 +85,7 @@ class HashedRelationSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("test serialization empty hash map") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -157,7 +157,7 @@ class HashedRelationSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("LongToUnsafeRowMap with very wide range") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -202,7 +202,7 @@ class HashedRelationSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("LongToUnsafeRowMap with random keys") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
@@ -256,7 +256,7 @@ class HashedRelationSuite extends SparkFunSuite with 
SharedSQLContext {
 
   test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
     val taskMemoryManager = new TaskMemoryManager(
-      new StaticMemoryManager(
+      new UnifiedMemoryManager(
         new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
         Long.MaxValue,
         Long.MaxValue,
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index fe65353b9d502..d1a6e8a89acce 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.netty.NettyBlockTransferService
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.LiveListenerBus
@@ -215,8 +215,6 @@ abstract class 
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
   test("Test Block - isFullyConsumed") {
     val sparkConf = new SparkConf().set("spark.app.id", "streaming-test")
     sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
-    // spark.storage.unrollFraction set to 0.4 for BlockManager
-    sparkConf.set("spark.storage.unrollFraction", "0.4")
 
     sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption)
     // Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
@@ -282,7 +280,7 @@ abstract class 
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
       maxMem: Long,
       conf: SparkConf,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
-    val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
+    val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
     val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to