Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19311#discussion_r140651608
  
    --- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
    @@ -407,4 +407,119 @@ class MemoryStoreSuite
         })
         assert(memoryStore.getSize(blockId) === 10000)
       }
    +
    +  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
    +    // Setup a memory store with many blocks cached, and then one request 
which leads to multiple
    +    // blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
    +    // all locks are released.
    +    val ct = implicitly[ClassTag[Array[Byte]]]
    +    def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
    +      val tc = TaskContext.empty()
    +      val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
    +      val blockInfoManager = new BlockInfoManager
    +      blockInfoManager.registerTask(tc.taskAttemptId)
    +      var droppedSoFar = 0
    +      val blockEvictionHandler = new BlockEvictionHandler {
    +        var memoryStore: MemoryStore = _
    +
    +        override private[storage] def dropFromMemory[T: ClassTag](
    +            blockId: BlockId,
    +            data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
    +          if (droppedSoFar < failAfterDroppingNBlocks) {
    +            droppedSoFar += 1
    +            memoryStore.remove(blockId)
    +            if (readLockAfterDrop) {
    +              // for testing purposes, we act like another thread gets the 
read lock on the new
    +              // block
    +              StorageLevel.DISK_ONLY
    +            } else {
    +              StorageLevel.NONE
    +            }
    +          } else {
    +            throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
    +          }
    +        }
    +      }
    +      val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
    +          blockEvictionHandler) {
    +        override def afterDropAction(blockId: BlockId): Unit = {
    +          if (readLockAfterDrop) {
    +            // pretend that we get a read lock on the block (now on disk) 
in another thread
    +            TaskContext.setTaskContext(tc)
    +            blockInfoManager.lockForReading(blockId)
    +            TaskContext.unset()
    +          }
    +        }
    +      }
    +
    +      blockEvictionHandler.memoryStore = memoryStore
    +      memManager.setMemoryStore(memoryStore)
    +
    +      // Put in some small blocks to fill up the memory store
    +      val initialBlocks = (1 to 10).map { id =>
    --- End diff --
    
    To piggy back on @vanzin's comment, sizePerBlock also please (so that 100 
goes away) ? Thx


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to