prakharjain09 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r380460550
########## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ########## @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + Seq(false, true).foreach { stream => + test(s"test block replication failures when block is received " + + s"by remote block manager but putBlock fails (stream = $stream)") { + // Retry replication logic for 1 failure + conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1) + // Custom block replication policy which prioritizes BlockManagers as per hostnames + conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName) + // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0 + val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512 + conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong) + + // Create 2 normal block manager + val store1 = makeBlockManager(10000, "host-1") + val store3 = makeBlockManager(10000, "host-3") + + // create 1 faulty block manager by injecting faulty memory manager + val memManager = UnifiedMemoryManager(conf, numCores = 1) + val mockedMemoryManager = spy(memManager) + doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any()) + val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager)) + + assert(master.getPeers(store1.blockManagerId).toSet === + Set(store2.blockManagerId, store3.blockManagerId)) + + val blockId = "blockId" + val message = new Array[Byte](1000) + + // Replication will be tried by store1 in this order: store2, store3 + // store2 is faulty block manager, so it won't be able to put block + // Then store1 will try to replicate block on store3 + store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2) + + val blockLocations = master.getLocations(blockId).toSet + assert(blockLocations === Set(store1.blockManagerId, store3.blockManagerId)) Review comment: @Ngone51 Previously I was asserting a) store1 and store3 has block present and b) store2 doesn't have block present. But now we are asking master endpoint to give us all locations of block and we are expecting it to be [store1, store3]. So both a) and b) are asserted together here and so we don't need to assert store2 doesn't have block explicitly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org