srowen closed pull request #23457: [SPARK-26539][CORE] Remove
spark.memory.useLegacyMode and StaticMemoryManager
URL: https://github.com/apache/spark/pull/23457
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 681e4378a4dd5..f27df505fa5ff 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -532,38 +532,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable
with Logging with Seria
}
// Validate memory fractions
- val deprecatedMemoryKeys = Seq(
- "spark.storage.memoryFraction",
- "spark.shuffle.memoryFraction",
- "spark.shuffle.safetyFraction",
- "spark.storage.unrollFraction",
- "spark.storage.safetyFraction")
- val memoryKeys = Seq(
- "spark.memory.fraction",
- "spark.memory.storageFraction") ++
- deprecatedMemoryKeys
- for (key <- memoryKeys) {
+ for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
throw new IllegalArgumentException(s"$key should be between 0 and 1
(was '$value').")
}
}
- // Warn against deprecated memory fractions (unless legacy memory
management mode is enabled)
- val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
- val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
- if (!legacyMemoryManagement) {
- val keyset = deprecatedMemoryKeys.toSet
- val detected = settings.keys().asScala.filter(keyset.contains)
- if (detected.nonEmpty) {
- logWarning("Detected deprecated memory fraction settings: " +
- detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution
and storage " +
- "memory management are unified. All memory fractions used in the old
model are " +
- "now deprecated and no longer read. If you wish to use the old
memory management, " +
- s"you may explicitly enable `$legacyMemoryManagementKey` (not
recommended).")
- }
- }
-
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in
Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 9222781fa0833..ba5ed8ab1f302 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager,
UnifiedMemoryManager}
+import org.apache.spark.memory.{MemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
@@ -322,13 +322,7 @@ object SparkEnv extends Logging {
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT),
shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
- val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode",
false)
- val memoryManager: MemoryManager =
- if (useLegacyMemoryManager) {
- new StaticMemoryManager(conf, numUsableCores)
- } else {
- UnifiedMemoryManager(conf, numUsableCores)
- }
+ val memoryManager: MemoryManager = UnifiedMemoryManager(conf,
numUsableCores)
val blockManagerPort = if (isDriver) {
conf.get(DRIVER_BLOCK_MANAGER_PORT)
diff --git
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
deleted file mode 100644
index 7e052c02c9376..0000000000000
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.BlockId
-
-/**
- * A [[MemoryManager]] that statically partitions the heap space into disjoint
regions.
- *
- * The sizes of the execution and storage regions are determined through
- * `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction`
respectively. The two
- * regions are cleanly separated such that neither usage can borrow memory
from the other.
- */
-private[spark] class StaticMemoryManager(
- conf: SparkConf,
- maxOnHeapExecutionMemory: Long,
- override val maxOnHeapStorageMemory: Long,
- numCores: Int)
- extends MemoryManager(
- conf,
- numCores,
- maxOnHeapStorageMemory,
- maxOnHeapExecutionMemory) {
-
- def this(conf: SparkConf, numCores: Int) {
- this(
- conf,
- StaticMemoryManager.getMaxExecutionMemory(conf),
- StaticMemoryManager.getMaxStorageMemory(conf),
- numCores)
- }
-
- // The StaticMemoryManager does not support off-heap storage memory:
-
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
- offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
-
- // Max number of bytes worth of blocks to evict when unrolling
- private val maxUnrollMemory: Long = {
- (maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction",
0.2)).toLong
- }
-
- override def maxOffHeapStorageMemory: Long = 0L
-
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- memoryMode: MemoryMode): Boolean = synchronized {
- require(memoryMode != MemoryMode.OFF_HEAP,
- "StaticMemoryManager does not support off-heap storage memory")
- if (numBytes > maxOnHeapStorageMemory) {
- // Fail fast if the block simply won't fit
- logInfo(s"Will not store $blockId as the required space ($numBytes
bytes) exceeds our " +
- s"memory limit ($maxOnHeapStorageMemory bytes)")
- false
- } else {
- onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
- }
- }
-
- override def acquireUnrollMemory(
- blockId: BlockId,
- numBytes: Long,
- memoryMode: MemoryMode): Boolean = synchronized {
- require(memoryMode != MemoryMode.OFF_HEAP,
- "StaticMemoryManager does not support off-heap unroll memory")
- if (numBytes > maxOnHeapStorageMemory) {
- // Fail fast if the block simply won't fit
- logInfo(s"Will not store $blockId as the required space ($numBytes
bytes) exceeds our " +
- s"memory limit ($maxOnHeapStorageMemory bytes)")
- false
- } else {
- val currentUnrollMemory =
onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
- val freeMemory = onHeapStorageMemoryPool.memoryFree
- // When unrolling, we will use all of the existing free memory, and, if
necessary,
- // some extra space freed from evicting cached blocks. We must place a
cap on the
- // amount of memory to be evicted by unrolling, however, otherwise
unrolling one
- // big block can blow away the entire cache.
- val maxNumBytesToFree = math.max(0, maxUnrollMemory -
currentUnrollMemory - freeMemory)
- // Keep it within the range 0 <= X <= maxNumBytesToFree
- val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes -
freeMemory))
- onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
- }
- }
-
- private[memory]
- override def acquireExecutionMemory(
- numBytes: Long,
- taskAttemptId: Long,
- memoryMode: MemoryMode): Long = synchronized {
- memoryMode match {
- case MemoryMode.ON_HEAP =>
onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- case MemoryMode.OFF_HEAP =>
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
- }
- }
-}
-
-
-private[spark] object StaticMemoryManager {
-
- private val MIN_MEMORY_BYTES = 32 * 1024 * 1024
-
- /**
- * Return the total amount of memory available for the storage region, in
bytes.
- */
- private def getMaxStorageMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.get(TEST_MEMORY)
- val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
- val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
- (systemMaxMemory * memoryFraction * safetyFraction).toLong
- }
-
- /**
- * Return the total amount of memory available for the execution region, in
bytes.
- */
- private def getMaxExecutionMemory(conf: SparkConf): Long = {
- val systemMaxMemory = conf.get(TEST_MEMORY)
-
- if (systemMaxMemory < MIN_MEMORY_BYTES) {
- throw new IllegalArgumentException(s"System memory $systemMaxMemory must
" +
- s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the
--driver-memory " +
- s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
- }
- if (conf.contains(config.EXECUTOR_MEMORY)) {
- val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
- if (executorMemory < MIN_MEMORY_BYTES) {
- throw new IllegalArgumentException(s"Executor memory $executorMemory
must be at least " +
- s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
- s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark
configuration.")
- }
- }
- val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
- val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
- (systemMaxMemory * memoryFraction * safetyFraction).toLong
- }
-
-}
diff --git
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 7801bb87050f6..a0fbbbdebd028 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -46,7 +46,7 @@ import org.apache.spark.storage.BlockId
* it if necessary. Cached blocks can be evicted only
if actual
* storage memory usage exceeds this region.
*/
-private[spark] class UnifiedMemoryManager private[memory] (
+private[spark] class UnifiedMemoryManager(
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala
b/core/src/main/scala/org/apache/spark/memory/package.scala
index 3d00cd9cb6377..7f782193f246f 100644
--- a/core/src/main/scala/org/apache/spark/memory/package.scala
+++ b/core/src/main/scala/org/apache/spark/memory/package.scala
@@ -61,15 +61,10 @@ package org.apache.spark
* }}}
*
*
- * There are two implementations of [[org.apache.spark.memory.MemoryManager]]
which vary in how
- * they handle the sizing of their memory pools:
+ * There is one implementation of [[org.apache.spark.memory.MemoryManager]]:
*
- * - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark
1.6+, enforces soft
+ * - [[org.apache.spark.memory.UnifiedMemoryManager]] enforces soft
* boundaries between storage and execution memory, allowing requests for
memory in one region
* to be fulfilled by borrowing memory from the other.
- * - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries
between storage
- * and execution memory by statically partitioning Spark's memory and
preventing storage and
- * execution from borrowing memory from each other. This mode is retained
only for legacy
- * compatibility purposes.
*/
package object memory
diff --git
a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index a0664b30d6cc2..dc1fe774f7961 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -29,7 +29,7 @@
@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
diff --git
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 9ad2e9a5e74ac..6976464e8ab5d 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -145,8 +145,7 @@ class BroadcastSuite extends SparkFunSuite with
LocalSparkContext with Encryptio
encryptionTest("Cache broadcast to disk") { conf =>
conf.setMaster("local")
.setAppName("test")
- .set("spark.memory.useLegacyMode", "true")
- .set("spark.storage.memoryFraction", "0.0")
+ .set("spark.memory.storageFraction", "0.0")
sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
val broadcast = sc.broadcast(list)
@@ -173,8 +172,7 @@ class BroadcastSuite extends SparkFunSuite with
LocalSparkContext with Encryptio
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("test")
- .set("spark.memory.useLegacyMode", "true")
- .set("spark.storage.memoryFraction", "0.0")
+ .set("spark.memory.storageFraction", "0.0")
sc = new SparkContext(conf)
val list = List[Int](1, 2, 3, 4)
diff --git
a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
deleted file mode 100644
index c3275add50f48..0000000000000
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.memory
-
-import org.mockito.Mockito.when
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
-import org.apache.spark.internal.config.Tests.TEST_MEMORY
-import org.apache.spark.storage.TestBlockId
-import org.apache.spark.storage.memory.MemoryStore
-
-class StaticMemoryManagerSuite extends MemoryManagerSuite {
- private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
-
- /**
- * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class
dependencies.
- */
- private def makeThings(
- maxExecutionMem: Long,
- maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
- val mm = new StaticMemoryManager(
- conf,
- maxOnHeapExecutionMemory = maxExecutionMem,
- maxOnHeapStorageMemory = maxStorageMem,
- numCores = 1)
- val ms = makeMemoryStore(mm)
- (mm, ms)
- }
-
- override protected def createMemoryManager(
- maxOnHeapExecutionMemory: Long,
- maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
- new StaticMemoryManager(
- conf.clone
- .set("spark.memory.fraction", "1")
- .set(TEST_MEMORY, maxOnHeapExecutionMemory)
- .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory),
- maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
- maxOnHeapStorageMemory = 0,
- numCores = 1)
- }
-
- test("basic execution memory") {
- val maxExecutionMem = 1000L
- val taskAttemptId = 0L
- val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
- val memoryMode = MemoryMode.ON_HEAP
- assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L, taskAttemptId, memoryMode) === 10L)
- assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
- // Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) ===
890L)
- assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 0L)
- assert(mm.executionMemoryUsed === maxExecutionMem)
- mm.releaseExecutionMemory(800L, taskAttemptId, memoryMode)
- assert(mm.executionMemoryUsed === 200L)
- // Acquire after release
- assert(mm.acquireExecutionMemory(1L, taskAttemptId, memoryMode) === 1L)
- assert(mm.executionMemoryUsed === 201L)
- // Release beyond what was acquired
- mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, memoryMode)
- assert(mm.executionMemoryUsed === 0L)
- }
-
- test("basic storage memory") {
- val maxStorageMem = 1000L
- val dummyBlock = TestBlockId("you can see the world you brought to live")
- val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- val memoryMode = MemoryMode.ON_HEAP
- assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 10L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 10L)
-
- assert(mm.acquireStorageMemory(dummyBlock, 100L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 110L)
- // Acquire more than the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L,
memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 110L)
- // Acquire up to the max, requests after this are still granted due to LRU
eviction
- assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, memoryMode))
- assertEvictBlocksToFreeSpaceCalled(ms, 110L)
- assert(mm.storageMemoryUsed === 1000L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
- assertEvictBlocksToFreeSpaceCalled(ms, 1L)
- assert(evictedBlocks.nonEmpty)
- evictedBlocks.clear()
- // Note: We evicted 1 byte to put another 1-byte block in, so the storage
memory used remains at
- // 1000 bytes. This is different from real behavior, where the 1-byte
block would have evicted
- // the 1000-byte block entirely. This is set up differently so we can
write finer-grained tests.
- assert(mm.storageMemoryUsed === 1000L)
- mm.releaseStorageMemory(800L, memoryMode)
- assert(mm.storageMemoryUsed === 200L)
- // Acquire after release
- assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 201L)
- mm.releaseAllStorageMemory()
- assert(mm.storageMemoryUsed === 0L)
- assert(mm.acquireStorageMemory(dummyBlock, 1L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 1L)
- // Release beyond what was acquired
- mm.releaseStorageMemory(100L, memoryMode)
- assert(mm.storageMemoryUsed === 0L)
- }
-
- test("execution and storage isolation") {
- val maxExecutionMem = 200L
- val maxStorageMem = 1000L
- val taskAttemptId = 0L
- val dummyBlock = TestBlockId("ain't nobody love like you do")
- val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
- val memoryMode = MemoryMode.ON_HEAP
- // Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L, taskAttemptId, memoryMode) === 100L)
- assert(mm.storageMemoryUsed === 0L)
- assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L, taskAttemptId, memoryMode) ===
100L)
- assert(mm.storageMemoryUsed === 0L)
- assert(mm.executionMemoryUsed === 200L)
- // Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 50L)
- assert(mm.executionMemoryUsed === 200L)
- // Only execution memory should be released
- mm.releaseExecutionMemory(133L, taskAttemptId, memoryMode)
- assert(mm.storageMemoryUsed === 50L)
- assert(mm.executionMemoryUsed === 67L)
- // Only storage memory should be released
- mm.releaseAllStorageMemory()
- assert(mm.storageMemoryUsed === 0L)
- assert(mm.executionMemoryUsed === 67L)
- }
-
- test("unroll memory") {
- val maxStorageMem = 1000L
- val dummyBlock = TestBlockId("lonely water")
- val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- val memoryMode = MemoryMode.ON_HEAP
- assert(mm.acquireUnrollMemory(dummyBlock, 100L, memoryMode))
- when(ms.currentUnrollMemory).thenReturn(100L)
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 100L)
- mm.releaseUnrollMemory(40L, memoryMode)
- assert(mm.storageMemoryUsed === 60L)
- when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireStorageMemory(dummyBlock, 800L, memoryMode))
- assertEvictBlocksToFreeSpaceNotCalled(ms)
- assert(mm.storageMemoryUsed === 860L)
- // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400
bytes.
- // As of this point, cache memory is 800 bytes and current unroll memory
is 60 bytes.
- // Requesting 240 more bytes of unroll memory will leave our total unroll
memory at
- // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are
granted.
- assert(mm.acquireUnrollMemory(dummyBlock, 240L, memoryMode))
- assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000
- when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240
- assert(mm.storageMemoryUsed === 1000L)
- evictedBlocks.clear()
- // We already have 300 bytes of unroll memory, so requesting 150 more will
leave us
- // above the 400-byte limit. Since there is not enough free memory, this
request will
- // fail even after evicting as much as we can (400 - 300 = 100 bytes).
- assert(!mm.acquireUnrollMemory(dummyBlock, 150L, memoryMode))
- assertEvictBlocksToFreeSpaceCalled(ms, 100L)
- assert(mm.storageMemoryUsed === 900L)
- // Release beyond what was acquired
- mm.releaseUnrollMemory(maxStorageMem, memoryMode)
- assert(mm.storageMemoryUsed === 0L)
- }
-
-}
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 480e07fb9399a..2250ae2f771ed 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -91,7 +91,6 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf.set(IS_TESTING, true)
conf.set("spark.memory.fraction", "1")
conf.set("spark.memory.storageFraction", "1")
- conf.set("spark.storage.unrollFraction", "0.4")
conf.set("spark.storage.unrollMemoryThreshold", "512")
// to make a replication attempt to inactive store fail fast
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index bda81365b0792..c23264191e124 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -120,7 +120,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with BeforeAndAfterE
.set("spark.memory.fraction", "1")
.set("spark.memory.storageFraction", "1")
.set("spark.kryoserializer.buffer", "1m")
- .set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
diff --git
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index b02af2bfe7acc..7cdcd0fea2ed4 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import org.scalatest._
import org.apache.spark._
-import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore,
PartiallySerializedBlock, PartiallyUnrolledIterator}
import org.apache.spark.util._
@@ -39,7 +39,6 @@ class MemoryStoreSuite
with ResetSystemProperties {
var conf: SparkConf = new SparkConf(false)
- .set("spark.storage.unrollFraction", "0.4")
.set("spark.storage.unrollMemoryThreshold", "512")
// Reuse a serializer across tests to avoid creating a new thread-local
buffer on each test
@@ -60,7 +59,7 @@ class MemoryStoreSuite
}
def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem,
numCores = 1)
+ val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
val blockInfoManager = new BlockInfoManager
val blockEvictionHandler = new BlockEvictionHandler {
var memoryStore: MemoryStore = _
@@ -239,7 +238,7 @@ class MemoryStoreSuite
}
test("safely unroll blocks through putIteratorAsBytes") {
- val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val (memoryStore, blockInfoManager) = makeMemoryStore(8400)
val smallList = List.fill(40)(new Array[Byte](100))
val bigList = List.fill(40)(new Array[Byte](1000))
def smallIterator: Iterator[Any] =
smallList.iterator.asInstanceOf[Iterator[Any]]
@@ -290,11 +289,11 @@ class MemoryStoreSuite
blockInfoManager.removeBlock("b3")
putIteratorAsBytes("b3", smallIterator, ClassTag.Any)
- // Unroll huge block with not enough space. This should fail.
+ // Unroll huge block with not enough space. This should fail and kick out
b2 in the process.
val result4 = putIteratorAsBytes("b4", bigIterator, ClassTag.Any)
assert(result4.isLeft) // unroll was unsuccessful
assert(!memoryStore.contains("b1"))
- assert(memoryStore.contains("b2"))
+ assert(!memoryStore.contains("b2"))
assert(memoryStore.contains("b3"))
assert(!memoryStore.contains("b4"))
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an
iterator
@@ -417,7 +416,7 @@ class MemoryStoreSuite
val bytesPerSmallBlock = memStoreSize / numInitialBlocks
def testFailureOnNthDrop(numValidBlocks: Int, readLockAfterDrop: Boolean):
Unit = {
val tc = TaskContext.empty()
- val memManager = new StaticMemoryManager(conf, Long.MaxValue,
memStoreSize, numCores = 1)
+ val memManager = new UnifiedMemoryManager(conf, memStoreSize,
memStoreSize.toInt, 1)
val blockInfoManager = new BlockInfoManager
blockInfoManager.registerTask(tc.taskAttemptId)
var droppedSoFar = 0
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 6211399005e1a..1e0399809ba87 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -551,12 +551,11 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
test("force to spill for external aggregation") {
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.memoryFraction", "0.01")
- .set("spark.memory.useLegacyMode", "true")
- .set(TEST_MEMORY, 100000000L)
+ .set("spark.memory.storageFraction", "0.999")
+ .set(TEST_MEMORY, 471859200L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
- val N = 2e5.toInt
+ val N = 200000
sc.parallelize(1 to N, 2)
.map { i => (i, i) }
.groupByKey()
diff --git
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index aa400dd74e9ca..14148e0e67fa7 100644
---
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -638,12 +638,11 @@ class ExternalSorterSuite extends SparkFunSuite with
LocalSparkContext {
test("force to spill for external sorter") {
val conf = createSparkConf(loadDefaults = false, kryo = false)
- .set("spark.shuffle.memoryFraction", "0.01")
- .set("spark.memory.useLegacyMode", "true")
- .set(TEST_MEMORY, 100000000L)
+ .set("spark.memory.storageFraction", "0.999")
+ .set(TEST_MEMORY, 471859200L)
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
sc = new SparkContext("local", "test", conf)
- val N = 2e5.toInt
+ val N = 200000
val p = new org.apache.spark.HashPartitioner(2)
val p2 = new org.apache.spark.HashPartitioner(3)
sc.parallelize(1 to N, 3)
diff --git a/docs/configuration.md b/docs/configuration.md
index ff9b802617f08..efd74c3add5e2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1198,51 +1198,6 @@ Apart from these, the following properties are also
available, and may be useful
This must be set to a positive value when
<code>spark.memory.offHeap.enabled=true</code>.
</td>
</tr>
-<tr>
- <td><code>spark.memory.useLegacyMode</code></td>
- <td>false</td>
- <td>
- Whether to enable the legacy memory management mode used in Spark 1.5 and
before.
- The legacy mode rigidly partitions the heap space into fixed-size regions,
- potentially leading to excessive spilling if the application was not tuned.
- The following deprecated memory fraction configurations are not read
unless this is enabled:
- <code>spark.shuffle.memoryFraction</code><br>
- <code>spark.storage.memoryFraction</code><br>
- <code>spark.storage.unrollFraction</code>
- </td>
-</tr>
-<tr>
- <td><code>spark.shuffle.memoryFraction</code></td>
- <td>0.2</td>
- <td>
- (deprecated) This is read only if <code>spark.memory.useLegacyMode</code>
is enabled.
- Fraction of Java heap to use for aggregation and cogroups during shuffles.
- At any given time, the collective size of
- all in-memory maps used for shuffles is bounded by this limit, beyond
which the contents will
- begin to spill to disk. If spills are often, consider increasing this
value at the expense of
- <code>spark.storage.memoryFraction</code>.
- </td>
-</tr>
-<tr>
- <td><code>spark.storage.memoryFraction</code></td>
- <td>0.6</td>
- <td>
- (deprecated) This is read only if <code>spark.memory.useLegacyMode</code>
is enabled.
- Fraction of Java heap to use for Spark's memory cache. This should not be
larger than the "old"
- generation of objects in the JVM, which by default is given 0.6 of the
heap, but you can
- increase it if you configure your own old generation size.
- </td>
-</tr>
-<tr>
- <td><code>spark.storage.unrollFraction</code></td>
- <td>0.2</td>
- <td>
- (deprecated) This is read only if <code>spark.memory.useLegacyMode</code>
is enabled.
- Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling
blocks in memory.
- This is dynamically allocated by dropping existing blocks when there is
not enough free
- storage space to unroll the new block in its entirety.
- </td>
-</tr>
<tr>
<td><code>spark.storage.replication.proactive</code></td>
<td>false</td>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 7c21062c4cec3..a708926dd1f85 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -94,7 +94,7 @@ private[execution] object HashedRelation {
taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -227,7 +227,7 @@ private[joins] class UnsafeHashedRelation(
// TODO(josh): This needs to be revisited before we merge this patch;
making this change now
// so that tests compile:
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -392,7 +392,7 @@ private[execution] final class LongToUnsafeRowMap(val mm:
TaskMemoryManager, cap
def this() = {
this(
new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index b7d28988274bf..334f0275d4ebf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -22,7 +22,7 @@ import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.internal.config._
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
import org.apache.spark.sql.execution.vectorized.AggregateHashMap
@@ -473,7 +473,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
value.setInt(0, 555)
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -504,7 +504,7 @@ object AggregateBenchmark extends SqlBasedBenchmark {
Seq("off", "on").foreach { heap =>
benchmark.addCase(s"BytesToBytesMap ($heap Heap)") { _ =>
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, s"${heap ==
"off"}")
.set(MEMORY_OFFHEAP_SIZE.key, "102400000"),
Long.MaxValue,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
index bdf753debe62a..0b356a9e34c58 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/HashedRelationMetricsBenchmark.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.benchmark
import org.apache.spark.SparkConf
import org.apache.spark.benchmark.Benchmark
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
UnsafeProjection}
import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
@@ -43,7 +43,7 @@ object HashedRelationMetricsBenchmark extends
SqlBasedBenchmark {
val benchmark = new Benchmark("LongToUnsafeRowMap metrics", numRows,
output = output)
benchmark.addCase("LongToUnsafeRowMap") { iter =>
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index d9b34dcd16476..7b55e839e3b4c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -23,7 +23,7 @@ import scala.util.Random
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.MEMORY_OFFHEAP_ENABLED
-import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
+import org.apache.spark.memory.{TaskMemoryManager, UnifiedMemoryManager}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -36,7 +36,7 @@ import org.apache.spark.util.collection.CompactBuffer
class HashedRelationSuite extends SparkFunSuite with SharedSQLContext {
val mm = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -85,7 +85,7 @@ class HashedRelationSuite extends SparkFunSuite with
SharedSQLContext {
test("test serialization empty hash map") {
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -157,7 +157,7 @@ class HashedRelationSuite extends SparkFunSuite with
SharedSQLContext {
test("LongToUnsafeRowMap with very wide range") {
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -202,7 +202,7 @@ class HashedRelationSuite extends SparkFunSuite with
SharedSQLContext {
test("LongToUnsafeRowMap with random keys") {
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
@@ -256,7 +256,7 @@ class HashedRelationSuite extends SparkFunSuite with
SharedSQLContext {
test("SPARK-24257: insert big values into LongToUnsafeRowMap") {
val taskMemoryManager = new TaskMemoryManager(
- new StaticMemoryManager(
+ new UnifiedMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index fe65353b9d502..d1a6e8a89acce 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
-import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
@@ -215,8 +215,6 @@ abstract class
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
test("Test Block - isFullyConsumed") {
val sparkConf = new SparkConf().set("spark.app.id", "streaming-test")
sparkConf.set("spark.storage.unrollMemoryThreshold", "512")
- // spark.storage.unrollFraction set to 0.4 for BlockManager
- sparkConf.set("spark.storage.unrollFraction", "0.4")
sparkConf.set(IO_ENCRYPTION_ENABLED, enableEncryption)
// Block Manager with 12000 * 0.4 = 4800 bytes of free space for unroll
@@ -282,7 +280,7 @@ abstract class
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
- val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem,
numCores = 1)
+ val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem, 1)
val transfer = new NettyBlockTransferService(conf, securityMgr,
"localhost", "localhost", 0, 1)
val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster,
serializerManager, conf,
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]