Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/11899#discussion_r57078012
--- Diff:
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import scala.language.implicitConversions
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.reflect.ClassTag
+
+import org.scalatest._
+
+import org.apache.spark._
+import org.apache.spark.memory.StaticMemoryManager
+import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
+import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore,
PartiallyUnrolledIterator}
+import org.apache.spark.util._
+import org.apache.spark.util.io.ChunkedByteBuffer
+
+class MemoryStoreSuite
+ extends SparkFunSuite
+ with PrivateMethodTester
+ with BeforeAndAfterEach
+ with ResetSystemProperties {
+
+ var conf: SparkConf = new SparkConf(false)
+ .set("spark.test.useCompressedOops", "true")
+ .set("spark.storage.unrollFraction", "0.4")
+ .set("spark.storage.unrollMemoryThreshold", "512")
+
+ // Reuse a serializer across tests to avoid creating a new thread-local
buffer on each test
+ val serializer = new KryoSerializer(new
SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
+
+ // Implicitly convert strings to BlockIds for test clarity.
+ implicit def StringToBlockId(value: String): BlockId = new
TestBlockId(value)
+ def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId,
splitId)
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ // Set the arch to 64-bit and compressedOops to true to get a
deterministic test-case
+ System.setProperty("os.arch", "amd64")
+ val initialize = PrivateMethod[Unit]('initialize)
+ SizeEstimator invokePrivate initialize()
+ }
+
+ def makeMemoryStore(maxMem: Long): (MemoryStore, BlockInfoManager) = {
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem,
numCores = 1)
+ val serializerManager = new SerializerManager(serializer, conf)
+ val blockInfoManager = new BlockInfoManager
+ val blockEvictionHandler = new BlockEvictionHandler {
+ var memoryStore: MemoryStore = _
+ override private[storage] def dropFromMemory[T: ClassTag](
+ blockId: BlockId,
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel =
{
+ memoryStore.remove(blockId)
+ StorageLevel.NONE
+ }
+ }
+ val memoryStore =
+ new MemoryStore(conf, blockInfoManager, serializerManager,
memManager, blockEvictionHandler)
+ memManager.setMemoryStore(memoryStore)
+ blockEvictionHandler.memoryStore = memoryStore
+ (memoryStore, blockInfoManager)
+ }
+
+ test("reserve/release unroll memory") {
+ val (memoryStore, _) = makeMemoryStore(12000)
+ assert(memoryStore.currentUnrollMemory === 0)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
+ memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
+ }
+
+ // Reserve
+ assert(reserveUnrollMemoryForThisTask(100))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 100)
+ assert(reserveUnrollMemoryForThisTask(200))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 300)
+ assert(reserveUnrollMemoryForThisTask(500))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 800)
+ assert(!reserveUnrollMemoryForThisTask(1000000))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not
granted
+ // Release
+ memoryStore.releaseUnrollMemoryForThisTask(100)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 700)
+ memoryStore.releaseUnrollMemoryForThisTask(100)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 600)
+ // Reserve again
+ assert(reserveUnrollMemoryForThisTask(4400))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 5000)
+ assert(!reserveUnrollMemoryForThisTask(20000))
+ assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not
granted
+ // Release again
+ memoryStore.releaseUnrollMemoryForThisTask(1000)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
+ memoryStore.releaseUnrollMemoryForThisTask() // release all
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("safely unroll blocks") {
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ val ct = implicitly[ClassTag[Array[Byte]]]
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIterator[T](
+ blockId: BlockId,
+ iter: Iterator[T],
+ classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]
= {
+ assert(blockInfoManager.lockNewBlockForWriting(
+ blockId,
+ new BlockInfo(StorageLevel.MEMORY_ONLY, classTag, tellMaster =
false)))
+ val res = memoryStore.putIterator(blockId, iter,
StorageLevel.MEMORY_ONLY, classTag)
+ blockInfoManager.unlock(blockId)
+ res
+ }
+
+ // Unroll with all the space in the world. This should succeed.
+ var putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putResult.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach {
case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ blockInfoManager.lockForWriting("unroll")
+ assert(memoryStore.remove("unroll"))
+ blockInfoManager.removeBlock("unroll")
+
+ // Unroll with not enough space. This should succeed after kicking out
someBlock1.
+ assert(putIterator("someBlock1", smallList.iterator, ct).isRight)
+ assert(putIterator("someBlock2", smallList.iterator, ct).isRight)
+ putResult = putIterator("unroll", smallList.iterator, ClassTag.Any)
+ assert(putResult.isRight)
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ assert(memoryStore.contains("someBlock2"))
+ assert(!memoryStore.contains("someBlock1"))
+ smallList.iterator.zip(memoryStore.getValues("unroll").get).foreach {
case (e, a) =>
+ assert(e === a, "getValues() did not return original values!")
+ }
+ blockInfoManager.lockForWriting("unroll")
+ assert(memoryStore.remove("unroll"))
+ blockInfoManager.removeBlock("unroll")
+
+ // Unroll huge block with not enough space. Even after ensuring free
space of 12000 * 0.4 =
+ // 4800 bytes, there is still not enough room to unroll this block.
This returns an iterator.
+ // In the meantime, however, we kicked out someBlock2 before giving up.
+ assert(putIterator("someBlock3", smallList.iterator, ct).isRight)
+ putResult = putIterator("unroll", bigList.iterator, ClassTag.Any)
+ assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned
an iterator
+ assert(!memoryStore.contains("someBlock2"))
+ assert(putResult.isLeft)
+ bigList.iterator.zip(putResult.left.get).foreach { case (e, a) =>
+ assert(e === a, "putIterator() did not return original values!")
+ }
+ // The unroll memory was freed once the iterator returned by
putIterator() was fully traversed.
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ }
+
+ test("safely unroll blocks through putIterator") {
+ val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
+ val smallList = List.fill(40)(new Array[Byte](100))
+ val bigList = List.fill(40)(new Array[Byte](1000))
+ def smallIterator: Iterator[Any] =
smallList.iterator.asInstanceOf[Iterator[Any]]
+ def bigIterator: Iterator[Any] =
bigList.iterator.asInstanceOf[Iterator[Any]]
+ assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+
+ def putIterator[T](
+ blockId: BlockId,
--- End diff --
indent
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]