Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380647954
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
##########
@@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends
SparkFunSuite
}
}
+ Seq(false, true).foreach { stream =>
+ test(s"test block replication failures when block is received " +
+ s"by remote block manager but putBlock fails (stream = $stream)") {
+ // Retry replication logic for 1 failure
+ conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
+ // Custom block replication policy which prioritizes BlockManagers as
per hostnames
+ conf.set(STORAGE_REPLICATION_POLICY,
classOf[SortOnHostNameBlockReplicationPolicy].getName)
+ // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+ val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+ conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM,
maxRemoteBlockSizeFetchToMem.toLong)
+
+ // Create 2 normal block manager
+ val store1 = makeBlockManager(10000, "host-1")
+ val store3 = makeBlockManager(10000, "host-3")
+
+ // create 1 faulty block manager by injecting faulty memory manager
+ val memManager = UnifiedMemoryManager(conf, numCores = 1)
+ val mockedMemoryManager = spy(memManager)
+ doAnswer(_ =>
false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
+ val store2 = makeBlockManager(10000, "host-2", Some(mockedMemoryManager))
+
+ assert(master.getPeers(store1.blockManagerId).toSet ===
+ Set(store2.blockManagerId, store3.blockManagerId))
+
+ val blockId = "blockId"
+ val message = new Array[Byte](1000)
+
+ // Replication will be tried by store1 in this order: store2, store3
+ // store2 is faulty block manager, so it won't be able to put block
+ // Then store1 will try to replicate block on store3
+ store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
+
+ val blockLocations = master.getLocations(blockId).toSet
+ assert(blockLocations === Set(store1.blockManagerId,
store3.blockManagerId))
Review comment:
I see. Make sense.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]