prakharjain09 commented on a change in pull request #27539: [SPARK-30786] 
[CORE] Fix Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380460550
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 ##########
 @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends 
SparkFunSuite
     }
   }
 
+  Seq(false, true).foreach { stream =>
+    test(s"test block replication failures when block is received " +
+      s"by remote block manager but putBlock fails (stream = $stream)") {
+      // Retry replication logic for 1 failure
+      conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
+      // Custom block replication policy which prioritizes BlockManagers as 
per hostnames
+      conf.set(STORAGE_REPLICATION_POLICY, 
classOf[SortOnHostNameBlockReplicationPolicy].getName)
+      // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+      val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+      conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 
maxRemoteBlockSizeFetchToMem.toLong)
+
+      // Create 2 normal block manager
+      val store1 = makeBlockManager(10000, "host-1")
+      val store3 = makeBlockManager(10000, "host-3")
+
+      // create 1 faulty block manager by injecting faulty memory manager
+      val memManager = UnifiedMemoryManager(conf, numCores = 1)
+      val mockedMemoryManager = spy(memManager)
+      doAnswer(_ => 
false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
+      val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager))
+
+      assert(master.getPeers(store1.blockManagerId).toSet ===
+        Set(store2.blockManagerId, store3.blockManagerId))
+
+      val blockId = "blockId"
+      val message = new Array[Byte](1000)
+
+      // Replication will be tried by store1 in this order: store2, store3
+      // store2 is faulty block manager, so it won't be able to put block
+      // Then store1 will try to replicate block on store3
+      store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
+
+      val blockLocations = master.getLocations(blockId).toSet
+      assert(blockLocations === Set(store1.blockManagerId, 
store3.blockManagerId))
 
 Review comment:
   @Ngone51 Previously I was asserting a) store1 and store3 has block present 
and b) store2 doesn't have block present.
   
   But now we are asking master endpoint to give us all locations of block and 
we are expecting it to be [store1, store3]. So both a) and b) are asserted 
together here and so we don't need to assert store2 doesn't have block 
explicitly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to