SongYadong commented on issue #23497: [SPARK-26578][CORE] Synchronize putBytes's memory allocation and putting block on memoryManager URL: https://github.com/apache/spark/pull/23497#issuecomment-457238274 Sorry for my late feedback. I have made an experiment by tring to extend the gap between acquiring memory and adding block to entries, like this: (in `putBytes()` method) ``` if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // scalastyle:off println println(System.currentTimeMillis() + " : putBytes() enter sleep after acquired memory") Thread.sleep(20000L) // wait for another thread to get memoryManager lock println(System.currentTimeMillis() + " : putBytes() leave sleep") // scalastyle:on println // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } ``` then i write some test codes: ``` test("two threads put ByteBuffer to MemoryStore concurrently") { val (memoryStore, blockInfoManager) = makeMemoryStore(12000) val blockId1 = BlockId("rdd_3_1") val blockId2 = BlockId("rdd_5_2") import java.util.concurrent.{Executors, ExecutorService, CountDownLatch} val latch: CountDownLatch = new CountDownLatch(2) val threadPool: ExecutorService = Executors.newFixedThreadPool(2) try { threadPool.execute(new Runnable{ override def run(): Unit = { var bytes: ChunkedByteBuffer = null // scalastyle:off println println(System.currentTimeMillis() + " : task 1 start") println(System.currentTimeMillis() + " : task 1 begin putBytes") blockInfoManager.lockNewBlockForWriting( blockId1, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) memoryStore.putBytes(blockId1, 10000, MemoryMode.ON_HEAP, () => { bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) blockInfoManager.unlock(blockId1) println(System.currentTimeMillis() + " : task 1 finish putBytes") assert(memoryStore.getSize(blockId1) === 10000) assert(!memoryStore.contains(blockId2)) println(System.currentTimeMillis() + " : task 1 assert finish") // scalastyle:on latch.countDown } }) threadPool.execute(new Runnable{ override def run(): Unit = { var bytes: ChunkedByteBuffer = null // scalastyle:off println println(System.currentTimeMillis() + " : task 2 start") Thread.sleep(10000L) // wait for task1 acquiring memory println(System.currentTimeMillis() + " : task 2 begin putBytes") blockInfoManager.lockNewBlockForWriting( blockId2, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) memoryStore.putBytes(blockId2, 10000, MemoryMode.ON_HEAP, () => { bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) blockInfoManager.unlock(blockId2) println(System.currentTimeMillis() + " : task 2 finish putBytes") assert(!memoryStore.contains(blockId1)) assert(!memoryStore.contains(blockId2)) println(System.currentTimeMillis() + " : task 2 assert finish") // scalastyle:on println latch.countDown } }) } finally { threadPool.shutdown() } latch.await() } ``` output printed: 1548342414701 : task 1 start 1548342414701 : task 1 begin putBytes 1548342414709 : task 2 start 1548342414755 : putBytes() enter sleep after acquired memory 1548342424709 : task 2 begin putBytes 1548342424713 : evictBlocksToFreeSpace(): before eviction: entries.size: 0 1548342424721 : task 2 finish putBytes 1548342424742 : task 2 assert finish 1548342434754 : putBytes() leave sleep 1548342434946 : task 1 finish putBytes 1548342434981 : task 1 assert finish In my experiment test, I found some problem like this: 1. In thread1, `putBytes` allocats memory for block1 (but hasn't added to `entries`) 2. In thread1, `putBytes` unlock `memoryManager` 3. In thread2, `putBytes` try to allocat memory for block2, but there isn't enough free space 4. In thread2, `evictBlockToFreeSpace` evicts all blocks in `entries` (but not block1 as it's not in `entries` yet) 5. In thread2, `putBytes` can't find enough space and fail to store block2. If block1 had been evicted, maybe we get enough space for block2. Indeed this is very rare, but the experiment shows the possibility exists. It may happen when we have : 1) a block1 acquired memory but hasn't added to `entries`; 2) a big block2 that need block1's space if it wants to store successfully; 3) most important, putting them into `memoryStore` almost the same time; Once this occurs, it may be puzzling, we evicted all entries but still can't get enough memory, although we know the memory pool is big enough. Besides, method `putIterator` may also have the same situation as the logic is similar to `putBytes`. (I havn't write experiment code for `putIterator`). @srowen @kiszk
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
