[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r380647954 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ## @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + Seq(false, true).foreach { stream => +test(s"test block replication failures when block is received " + + s"by remote block manager but putBlock fails (stream = $stream)") { + // Retry replication logic for 1 failure + conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1) + // Custom block replication policy which prioritizes BlockManagers as per hostnames + conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName) + // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0 + val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512 + conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong) + + // Create 2 normal block manager + val store1 = makeBlockManager(1, "host-1") + val store3 = makeBlockManager(1, "host-3") + + // create 1 faulty block manager by injecting faulty memory manager + val memManager = UnifiedMemoryManager(conf, numCores = 1) + val mockedMemoryManager = spy(memManager) + doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any()) + val store2 = makeBlockManager(1, "host-2", Some(mockedMemoryManager)) + + assert(master.getPeers(store1.blockManagerId).toSet === +Set(store2.blockManagerId, store3.blockManagerId)) + + val blockId = "blockId" + val message = new Array[Byte](1000) + + // Replication will be tried by store1 in this order: store2, store3 + // store2 is faulty block manager, so it won't be able to put block + // Then store1 will try to replicate block on store3 + store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2) + + val blockLocations = master.getLocations(blockId).toSet + assert(blockLocations === Set(store1.blockManagerId, store3.blockManagerId)) Review comment: I see. Make sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r380419076 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ## @@ -509,3 +548,17 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB classOf[DummyTopologyMapper].getName) } +// BlockReplicationPolicy to prioritize BlockManagers based on hostnames +// Examples - for BM-x(host-2), BM-y(host-1), BM-z(host-3), it will prioritize them as +// BM-y(host-1), BM-x(host-2), BM-z(host-3) +class SortOnHostNameBlockReplicationPolicy + extends BlockReplicationPolicy { + override def prioritize( + blockManagerId: BlockManagerId, + peers: Seq[BlockManagerId], + peersReplicatedTo: mutable.HashSet[BlockManagerId], + blockId: BlockId, + numReplicas: Int): List[BlockManagerId] = { Review comment: nit: style. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r380419021 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ## @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + Seq(false, true).foreach { stream => +test(s"test block replication failures when block is received " + + s"by remote block manager but putBlock fails (stream = $stream)") { + // Retry replication logic for 1 failure + conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1) + // Custom block replication policy which prioritizes BlockManagers as per hostnames + conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName) + // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0 + val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512 + conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong) + + // Create 2 normal block manager + val store1 = makeBlockManager(1, "host-1") + val store3 = makeBlockManager(1, "host-3") + + // create 1 faulty block manager by injecting faulty memory manager + val memManager = UnifiedMemoryManager(conf, numCores = 1) + val mockedMemoryManager = spy(memManager) + doAnswer(_ => false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any()) + val store2 = makeBlockManager(1, "host-2", Some(mockedMemoryManager)) + + assert(master.getPeers(store1.blockManagerId).toSet === +Set(store2.blockManagerId, store3.blockManagerId)) + + val blockId = "blockId" + val message = new Array[Byte](1000) + + // Replication will be tried by store1 in this order: store2, store3 + // store2 is faulty block manager, so it won't be able to put block + // Then store1 will try to replicate block on store3 + store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2) + + val blockLocations = master.getLocations(blockId).toSet + assert(blockLocations === Set(store1.blockManagerId, store3.blockManagerId)) Review comment: You may also need to assert that `store2` doesn't contains `blockId` as you did previously. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r379293717 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ## @@ -624,6 +624,49 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq(false, true).foreach { stream => +test(s"test for Block replication retry logic (stream = $stream)") { + // Retry replication logic for 2 failures + conf.set(STORAGE_MAX_REPLICATION_FAILURE, 2) + // Custom block replication policy which prioritizes BlockManagers as per hostnames + conf.set(STORAGE_REPLICATION_POLICY, classOf[SortOnHostNameBlockReplicationPolicy].getName) + // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0 + val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512 + conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, maxRemoteBlockSizeFetchToMem.toLong) + val blockManagers = (0 to 6).map(index => makeBlockManager(7800, s"host-$index", master)) + val a1 = new Array[Byte](4000) + val a2 = new Array[Byte](4000) + val a3 = new Array[Byte](4000) + // Put 4000 byte sized RDD Blocks in block manager 1, 2 and 4. So that they + // won't have space for another 4000 bytes block. + blockManagers(1).putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY) + blockManagers(2).putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY) + blockManagers(4).putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY) Review comment: Since the block is stored in memory_only mode, so it's possible that the block can be evicted by the newer block. Actually, personally, I prefer the original way, which is setting low memory for these BlockManagers. But I do notice the flay too, though I don't look into for details. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r377678687 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ## @@ -624,6 +624,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq(0, Int.MaxValue - 512).foreach { maxRemoteBlockSizeFetchToMem => Review comment: We could use `Seq(true, false)` instead and name the test as `test for Block replication retry logic with stream = true/false`. And in the test, we could have a variable, e.g. `maxRemoteBlockSizeFetchToMem`, to set the size(0, Int.MaxValue - 512). Thus, the test name could be more understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r377664031 ## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ## @@ -624,6 +624,46 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.getRemoteBytes("list1").isEmpty) } + Seq(0, Int.MaxValue - 512).foreach { maxRemoteBlockSizeFetchToMem => +// Trying different values of maxRemoteBlockSizeFetchToMem to test both +// non-streaming and streaming flows of Block replication Review comment: nit: `streaming` -> `stream` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager URL: https://github.com/apache/spark/pull/27539#discussion_r377650369 ## File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ## @@ -105,7 +105,11 @@ class NettyBlockRpcServer( val blockId = BlockId(uploadBlock.blockId) logDebug(s"Receiving replicated block $blockId with level ${level} " + s"from ${client.getSocketAddress}") -blockManager.putBlockData(blockId, data, level, classTag) +val blockStored = blockManager.putBlockData(blockId, data, level, classTag) +if (!blockStored) { + throw new Exception(s"Upload block for $blockId failed. This " + +s"mostly happens when there is not sufficient space available to store the block.") +} Review comment: > Should we use responseContext.onFailure instead of exception here? Yeah. I'd prefer to only use it here. But I don't expect to use it in `putBlockDataAsStream`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org