Github user mridulm commented on a diff in the pull request:
https://github.com/apache/spark/pull/19311#discussion_r140651741
--- Diff:
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
})
assert(memoryStore.getSize(blockId) === 10000)
}
+
+ test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+ // Setup a memory store with many blocks cached, and then one request
which leads to multiple
+ // blocks getting evicted. We'll make the eviction throw an
exception, and make sure that
+ // all locks are released.
+ val ct = implicitly[ClassTag[Array[Byte]]]
+ def testFailureOnNthDrop(failAfterDroppingNBlocks: Int,
readLockAfterDrop: Boolean): Unit = {
+ val tc = TaskContext.empty()
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100,
numCores = 1)
+ val blockInfoManager = new BlockInfoManager
+ blockInfoManager.registerTask(tc.taskAttemptId)
+ var droppedSoFar = 0
+ val blockEvictionHandler = new BlockEvictionHandler {
+ var memoryStore: MemoryStore = _
+
+ override private[storage] def dropFromMemory[T: ClassTag](
+ blockId: BlockId,
+ data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel
= {
+ if (droppedSoFar < failAfterDroppingNBlocks) {
+ droppedSoFar += 1
+ memoryStore.remove(blockId)
+ if (readLockAfterDrop) {
+ // for testing purposes, we act like another thread gets the
read lock on the new
+ // block
+ StorageLevel.DISK_ONLY
+ } else {
+ StorageLevel.NONE
+ }
+ } else {
+ throw new RuntimeException(s"Mock error dropping block
$droppedSoFar")
+ }
+ }
+ }
+ val memoryStore = new MemoryStore(conf, blockInfoManager,
serializerManager, memManager,
+ blockEvictionHandler) {
+ override def afterDropAction(blockId: BlockId): Unit = {
+ if (readLockAfterDrop) {
+ // pretend that we get a read lock on the block (now on disk)
in another thread
+ TaskContext.setTaskContext(tc)
+ blockInfoManager.lockForReading(blockId)
+ TaskContext.unset()
+ }
+ }
+ }
+
+ blockEvictionHandler.memoryStore = memoryStore
+ memManager.setMemoryStore(memoryStore)
+
+ // Put in some small blocks to fill up the memory store
+ val initialBlocks = (1 to 10).map { id =>
+ val blockId = BlockId(s"rdd_1_$id")
+ val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct,
tellMaster = false)
+ val initialWriteLock =
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+ assert(initialWriteLock)
+ val success = memoryStore.putBytes(blockId, 10,
MemoryMode.ON_HEAP, () => {
+ new ChunkedByteBuffer(ByteBuffer.allocate(10))
+ })
+ assert(success)
+ blockInfoManager.unlock(blockId, None)
+ }
+ assert(blockInfoManager.size === 10)
+
+
+ // Add one big block, which will require evicting everything in the
memorystore. However our
+ // mock BlockEvictionHandler will throw an exception -- make sure
all locks are cleared.
+ val largeBlockId = BlockId(s"rdd_2_1")
+ val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct,
tellMaster = false)
+ val initialWriteLock =
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+ assert(initialWriteLock)
+ if (failAfterDroppingNBlocks < 10) {
+ val exc = intercept[RuntimeException] {
+ memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, ()
=> {
+ new ChunkedByteBuffer(ByteBuffer.allocate(100))
+ })
+ }
+ assert(exc.getMessage().startsWith("Mock error dropping block"),
exc)
+ // BlockManager.doPut takes care of releasing the lock for the
newly written block -- not
+ // testing that here, so do it manually
+ blockInfoManager.removeBlock(largeBlockId)
+ } else {
+ memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
+ new ChunkedByteBuffer(ByteBuffer.allocate(100))
+ })
+ // BlockManager.doPut takes care of releasing the lock for the
newly written block -- not
+ // testing that here, so do it manually
+ blockInfoManager.unlock(largeBlockId)
+ }
+
+ val largeBlockInMemory = if (failAfterDroppingNBlocks == 10) 1 else 0
+ val expBlocks = 10 +
+ (if (readLockAfterDrop) 0 else -failAfterDroppingNBlocks) +
+ largeBlockInMemory
+ assert(blockInfoManager.size === expBlocks)
+
+ val blocksStillInMemory = blockInfoManager.entries.filter { case
(id, info) =>
+ assert(info.writerTask === BlockInfo.NO_WRITER, id)
+ // in this test, all the blocks in memory have no reader, but
everything dropped to disk
+ // had another thread read the block. We shouldn't lose the other
thread's reader lock.
--- End diff --
I am curious about this part of the test.
Why are we checking for this (and so, why `afterDropAction` in the test
case) ? Even without this, the change and testcase looks fine to me.
Am I missing something ?
Are we testing for write lock release resulting in read unlock for other
task's as well ?
(To nitpick, the write lock release and read lock acquire can be
interspersed by another read or write acquire (ofcourse not in this test) )
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]