Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/19311#discussion_r140612177
--- Diff:
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
})
assert(memoryStore.getSize(blockId) === 10000)
}
+
+ test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+ // Setup a memory store with many blocks cached, and then one request
which leads to multiple
+ // blocks getting evicted. We'll make the eviction throw an
exception, and make sure that
+ // all locks are released.
+ val ct = implicitly[ClassTag[Array[Byte]]]
+ def testFailureOnNthDrop(failAfterDroppingNBlocks: Int,
readLockAfterDrop: Boolean): Unit = {
+ val tc = TaskContext.empty()
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100,
numCores = 1)
+ val blockInfoManager = new BlockInfoManager
+ blockInfoManager.registerTask(tc.taskAttemptId)
+ var droppedSoFar = 0
+ val blockEvictionHandler = new BlockEvictionHandler {
+ var memoryStore: MemoryStore = _
+
+ override private[storage] def dropFromMemory[T: ClassTag](
+ blockId: BlockId,
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel
= {
+ if (droppedSoFar < failAfterDroppingNBlocks) {
+ droppedSoFar += 1
+ memoryStore.remove(blockId)
+ if (readLockAfterDrop) {
+ // for testing purposes, we act like another thread gets the
read lock on the new
+ // block
+ StorageLevel.DISK_ONLY
+ } else {
+ StorageLevel.NONE
+ }
+ } else {
+ throw new RuntimeException(s"Mock error dropping block
$droppedSoFar")
+ }
+ }
+ }
+ val memoryStore = new MemoryStore(conf, blockInfoManager,
serializerManager, memManager,
+ blockEvictionHandler) {
+ override def afterDropAction(blockId: BlockId): Unit = {
+ if (readLockAfterDrop) {
+ // pretend that we get a read lock on the block (now on disk)
in another thread
+ TaskContext.setTaskContext(tc)
+ blockInfoManager.lockForReading(blockId)
+ TaskContext.unset()
+ }
+ }
+ }
+
+ blockEvictionHandler.memoryStore = memoryStore
+ memManager.setMemoryStore(memoryStore)
+
+ // Put in some small blocks to fill up the memory store
+ val initialBlocks = (1 to 10).map { id =>
--- End diff --
The logic looks fine, but I kinda dislike the magic number ("10") being
used everywhere.
A constant would make this much better (`val initialBlocks = 10`).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]