SongYadong commented on issue #23497: [SPARK-26578][CORE] Synchronize 
putBytes's memory allocation and putting block on memoryManager
URL: https://github.com/apache/spark/pull/23497#issuecomment-457238274
 
 
   Sorry for my late feedback.
   
   I have made an experiment by tring to extend the gap between acquiring 
memory and adding block to entries, like this: (in `putBytes()` method)
   ```
       if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {  
   
         // scalastyle:off println  
         println(System.currentTimeMillis() + " : putBytes() enter sleep after 
acquired memory")  
         Thread.sleep(20000L)  // wait for another thread to get memoryManager 
lock  
         println(System.currentTimeMillis() + " : putBytes() leave sleep")  
         // scalastyle:on println  
   
         // We acquired enough memory for the block, so go ahead and put it  
         val bytes = _bytes()  
         assert(bytes.size == size)  
         val entry = new SerializedMemoryEntry[T](bytes, memoryMode, 
implicitly[ClassTag[T]])  
         entries.synchronized {  
           entries.put(blockId, entry)  
         }  
   
   ```
   then i write some test codes:
   ```
   test("two threads put ByteBuffer to MemoryStore concurrently") {
       val (memoryStore, blockInfoManager) = makeMemoryStore(12000)
       val blockId1 = BlockId("rdd_3_1")
       val blockId2 = BlockId("rdd_5_2")
   
       import java.util.concurrent.{Executors, ExecutorService, CountDownLatch}
       val latch: CountDownLatch = new CountDownLatch(2)
       val threadPool: ExecutorService = Executors.newFixedThreadPool(2)
       try {
         threadPool.execute(new Runnable{
           override def run(): Unit = {
             var bytes: ChunkedByteBuffer = null
             // scalastyle:off println
             println(System.currentTimeMillis() + " : task 1 start")
             println(System.currentTimeMillis() + " : task 1 begin putBytes")
             blockInfoManager.lockNewBlockForWriting(
               blockId1, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, 
tellMaster = false))
             memoryStore.putBytes(blockId1, 10000, MemoryMode.ON_HEAP, () => {
               bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
               bytes
             })
             blockInfoManager.unlock(blockId1)
             println(System.currentTimeMillis() + " : task 1 finish putBytes")
             assert(memoryStore.getSize(blockId1) === 10000)
             assert(!memoryStore.contains(blockId2))
             println(System.currentTimeMillis() + " : task 1 assert finish")
             // scalastyle:on
             latch.countDown
           }
         })
   
         threadPool.execute(new Runnable{
           override def run(): Unit = {
             var bytes: ChunkedByteBuffer = null
             // scalastyle:off println
             println(System.currentTimeMillis() + " : task 2 start")
             Thread.sleep(10000L)  // wait for task1 acquiring memory
             println(System.currentTimeMillis() + " : task 2 begin putBytes")
             blockInfoManager.lockNewBlockForWriting(
               blockId2, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, 
tellMaster = false))
             memoryStore.putBytes(blockId2, 10000, MemoryMode.ON_HEAP, () => {
               bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
               bytes
             })
             blockInfoManager.unlock(blockId2)
             println(System.currentTimeMillis() + " : task 2 finish putBytes")
             assert(!memoryStore.contains(blockId1))
             assert(!memoryStore.contains(blockId2))
             println(System.currentTimeMillis() + " : task 2 assert finish")
             // scalastyle:on println
             latch.countDown
           }
         })
       } finally {
         threadPool.shutdown()
       }
       latch.await()
   }
   ```
   
   output printed:
   1548342414701 : task 1 start
   1548342414701 : task 1 begin putBytes
   1548342414709 : task 2 start
   1548342414755 : putBytes() enter sleep after acquired memory
   1548342424709 : task 2 begin putBytes
   1548342424713 : evictBlocksToFreeSpace(): before eviction: entries.size: 0
   1548342424721 : task 2 finish putBytes
   1548342424742 : task 2 assert finish
   1548342434754 : putBytes() leave sleep
   1548342434946 : task 1 finish putBytes
   1548342434981 : task 1 assert finish
   
   In my experiment test, I found some problem like this:
   1. In thread1, `putBytes` allocats memory for block1 (but hasn't added to 
`entries`)
   2. In thread1, `putBytes` unlock `memoryManager`
   3. In thread2, `putBytes` try to allocat memory for block2, but there isn't 
enough free space
   4. In thread2, `evictBlockToFreeSpace` evicts all blocks in `entries` (but 
not block1 as it's not in `entries` yet)
   5. In thread2, `putBytes` can't find enough space and fail to store block2. 
If block1 had been evicted, maybe we get enough space for block2.
   
   Indeed this is very rare, but the experiment shows the possibility exists. 
It may happen when we have :
   1) a block1 acquired memory but hasn't added to `entries`;
   2) a big block2 that need block1's space if it wants to store successfully;
   3) most important, putting them into `memoryStore` almost the same time;
   
   Once this occurs, it may be puzzling, we evicted all entries but still can't 
get enough memory, although we know the memory pool is big enough.
   
   Besides, method `putIterator`  may also have the same situation as the logic 
is similar to `putBytes`. (I havn't write experiment code for `putIterator`).
   
   @srowen @kiszk 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to