prakharjain09 commented on a change in pull request #27539: [SPARK-30786] 
[CORE] Fix Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r377604999
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
 ##########
 @@ -624,6 +624,47 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(store.getRemoteBytes("list1").isEmpty)
   }
 
+  Seq(0, Int.MaxValue - 512).foreach { maxRemoteBlockSizeFetchToMem =>
+    // Trying different values of maxRemoteBlockSizeFetchToMem to test both
+    // non-streaming and streaming flows of Block replication
+    test(s"test for Block replcation retry logic with " +
+      s"maxRemoteBlockSizeFetchToMem: ${maxRemoteBlockSizeFetchToMem}") {
+      val storageLevel = StorageLevel(
+        useDisk = false, useMemory = true, deserialized = true, replication = 
3)
+      // Retry replication logic for 2 failures
+      conf.set("spark.storage.maxReplicationFailures", "2")
+      // Custom block replication policy which prioritizes BlockManagers as 
per hostnames
+      conf.set("spark.storage.replication.policy",
+        classOf[SortOnHostNameBlockReplicationPolicy].getName)
+      conf.set("spark.network.maxRemoteBlockSizeFetchToMem", 
maxRemoteBlockSizeFetchToMem.toString)
+      val bm1 = makeBlockManager(12000, "host-1", master) // BM with 
sufficient memory
+      val bm2 = makeBlockManager(10, "host-2", master) // BM with less memory
+      val bm3 = makeBlockManager(10, "host-3", master) // BM with less memory
+      val bm4 = makeBlockManager(12000, "host-4", master) // BM with 
sufficient memory
+      val bm5 = makeBlockManager(10, "host-5", master) // BM with less memory
+      val bm6 = makeBlockManager(12000, "host-6", master) // BM with 
sufficient memory
+
+      val data = (1 to 10).toArray
+      val serializedData = serializer.newInstance().serialize(data).array()
+      val blockId = "list"
+      bm1.putIterator(blockId, List(data).iterator, storageLevel, tellMaster = 
true)
+
+      // Replication will be tried by bm1 in this order: bm2, bm3, bm4, bm5, 
bm6
+      // bm2, bm3 and bm5 had low memory. So Block can't be stored by them.
+      // bm6 has sufficient memory but the maxReplicationFailures is set to 2.
+      // So retry won't happen on bm6 as 3 failure (by trying on bm2, bm3 and 
bm5) have
 
 Review comment:
   Replication is retried - but only limited number of times and that is 
controlled by "spark.storage.maxReplicationFailures" config. If BM is still not 
able to replicate to target number of times after all retries, it will print a 
warning and return.
   
   1. Spark will  store block locally and then it will start the replication 
process, Say replication is set to 3. That means block will be replicated to 2 
more BlockManagers.
   2. Replicate method will firstly choose 1 of the peers and try to replicate 
the block to peer BM.
   3. Step-2 will be repeated until 
     3.a our replication criteria is met i.e. block replicated to 2 different 
peers OR 
     3.b we have hit the maxReplicationFailures threshold.
   
   Before changes in this PR, replication RPC call response was always 
successful (although the block might not have got replicated). So in the test 
case scenario, BlockManager  will try first on bm2 and then bm3 and both calls 
will succeed although block was not replicated to any of the block manager 
becuase of less memory. No warning will be printed and no retries will happen.
   
   After changes in this PR, firstly block replication will be tried on bm2. It 
will fail. Then it will be tried on bm3. It will also fail. Then it will be 
tried on bm4. It will succeed. Then it will be tried to bm5. It will fail. Now 
it won't try to replicate on bm6 as #failures [3] have exceeded 
maxReplicationFailures [2]. So the replicate method will return after printing 
[this 
warning](https://github.com/apache/spark/blob/v3.0.0-preview2-rc2/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1661)
 - "Block replicated to only 1 peer(s) instead of 2 peers".
   
   If "spark.storage.maxReplicationFailures" would have been configured to 3, 
BM will try to replicate to bm6 also and 2 copies of block will exist after 
these changes.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to