[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-18 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380647954
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 ##
 @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends 
SparkFunSuite
 }
   }
 
+  Seq(false, true).foreach { stream =>
+test(s"test block replication failures when block is received " +
+  s"by remote block manager but putBlock fails (stream = $stream)") {
+  // Retry replication logic for 1 failure
+  conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
+  // Custom block replication policy which prioritizes BlockManagers as 
per hostnames
+  conf.set(STORAGE_REPLICATION_POLICY, 
classOf[SortOnHostNameBlockReplicationPolicy].getName)
+  // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+  val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+  conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 
maxRemoteBlockSizeFetchToMem.toLong)
+
+  // Create 2 normal block manager
+  val store1 = makeBlockManager(1, "host-1")
+  val store3 = makeBlockManager(1, "host-3")
+
+  // create 1 faulty block manager by injecting faulty memory manager
+  val memManager = UnifiedMemoryManager(conf, numCores = 1)
+  val mockedMemoryManager = spy(memManager)
+  doAnswer(_ => 
false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
+  val store2 = makeBlockManager(1, "host-2", Some(mockedMemoryManager))
+
+  assert(master.getPeers(store1.blockManagerId).toSet ===
+Set(store2.blockManagerId, store3.blockManagerId))
+
+  val blockId = "blockId"
+  val message = new Array[Byte](1000)
+
+  // Replication will be tried by store1 in this order: store2, store3
+  // store2 is faulty block manager, so it won't be able to put block
+  // Then store1 will try to replicate block on store3
+  store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
+
+  val blockLocations = master.getLocations(blockId).toSet
+  assert(blockLocations === Set(store1.blockManagerId, 
store3.blockManagerId))
 
 Review comment:
   I see. Make sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-17 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380419076
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 ##
 @@ -509,3 +548,17 @@ class BlockManagerBasicStrategyReplicationSuite extends 
BlockManagerReplicationB
 classOf[DummyTopologyMapper].getName)
 }
 
+// BlockReplicationPolicy to prioritize BlockManagers based on hostnames
+// Examples - for BM-x(host-2), BM-y(host-1), BM-z(host-3), it will prioritize 
them as
+// BM-y(host-1), BM-x(host-2), BM-z(host-3)
+class SortOnHostNameBlockReplicationPolicy
+  extends BlockReplicationPolicy {
+  override def prioritize(
+   blockManagerId: BlockManagerId,
+   peers: Seq[BlockManagerId],
+   peersReplicatedTo: mutable.HashSet[BlockManagerId],
+   blockId: BlockId,
+   numReplicas: Int): List[BlockManagerId] = {
 
 Review comment:
   nit: style.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-17 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r380419021
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 ##
 @@ -255,6 +257,43 @@ trait BlockManagerReplicationBehavior extends 
SparkFunSuite
 }
   }
 
+  Seq(false, true).foreach { stream =>
+test(s"test block replication failures when block is received " +
+  s"by remote block manager but putBlock fails (stream = $stream)") {
+  // Retry replication logic for 1 failure
+  conf.set(STORAGE_MAX_REPLICATION_FAILURE, 1)
+  // Custom block replication policy which prioritizes BlockManagers as 
per hostnames
+  conf.set(STORAGE_REPLICATION_POLICY, 
classOf[SortOnHostNameBlockReplicationPolicy].getName)
+  // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+  val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+  conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 
maxRemoteBlockSizeFetchToMem.toLong)
+
+  // Create 2 normal block manager
+  val store1 = makeBlockManager(1, "host-1")
+  val store3 = makeBlockManager(1, "host-3")
+
+  // create 1 faulty block manager by injecting faulty memory manager
+  val memManager = UnifiedMemoryManager(conf, numCores = 1)
+  val mockedMemoryManager = spy(memManager)
+  doAnswer(_ => 
false).when(mockedMemoryManager).acquireStorageMemory(any(), any(), any())
+  val store2 = makeBlockManager(1, "host-2", Some(mockedMemoryManager))
+
+  assert(master.getPeers(store1.blockManagerId).toSet ===
+Set(store2.blockManagerId, store3.blockManagerId))
+
+  val blockId = "blockId"
+  val message = new Array[Byte](1000)
+
+  // Replication will be tried by store1 in this order: store2, store3
+  // store2 is faulty block manager, so it won't be able to put block
+  // Then store1 will try to replicate block on store3
+  store1.putSingle(blockId, message, StorageLevel.MEMORY_ONLY_SER_2)
+
+  val blockLocations = master.getLocations(blockId).toSet
+  assert(blockLocations === Set(store1.blockManagerId, 
store3.blockManagerId))
 
 Review comment:
   You may also need to assert that `store2` doesn't contains `blockId` as you 
did previously.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-14 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r379293717
 
 

 ##
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
 ##
 @@ -624,6 +624,49 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(store.getRemoteBytes("list1").isEmpty)
   }
 
+  Seq(false, true).foreach { stream =>
+test(s"test for Block replication retry logic (stream = $stream)") {
+  // Retry replication logic for 2 failures
+  conf.set(STORAGE_MAX_REPLICATION_FAILURE, 2)
+  // Custom block replication policy which prioritizes BlockManagers as 
per hostnames
+  conf.set(STORAGE_REPLICATION_POLICY, 
classOf[SortOnHostNameBlockReplicationPolicy].getName)
+  // To use upload block stream flow, set maxRemoteBlockSizeFetchToMem to 0
+  val maxRemoteBlockSizeFetchToMem = if (stream) 0 else Int.MaxValue - 512
+  conf.set(MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM, 
maxRemoteBlockSizeFetchToMem.toLong)
+  val blockManagers = (0 to 6).map(index => makeBlockManager(7800, 
s"host-$index", master))
+  val a1 = new Array[Byte](4000)
+  val a2 = new Array[Byte](4000)
+  val a3 = new Array[Byte](4000)
+  // Put 4000 byte sized RDD Blocks in block manager 1, 2 and 4. So that 
they
+  // won't have space for another 4000 bytes block.
+  blockManagers(1).putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
+  blockManagers(2).putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
+  blockManagers(4).putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
 
 Review comment:
   Since the block is stored in memory_only mode, so it's possible that the 
block can be evicted by the newer block.
   
   Actually, personally, I prefer the original way, which is setting low memory 
for these BlockManagers. But I do notice the flay too, though I don't look into 
for details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-11 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r377678687
 
 

 ##
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
 ##
 @@ -624,6 +624,46 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(store.getRemoteBytes("list1").isEmpty)
   }
 
+  Seq(0, Int.MaxValue - 512).foreach { maxRemoteBlockSizeFetchToMem =>
 
 Review comment:
   We could use `Seq(true, false)` instead and name the test as `test for Block 
replication retry logic with stream = true/false`. And in the test, we could 
have a variable, e.g. `maxRemoteBlockSizeFetchToMem`, to set the size(0, 
Int.MaxValue - 512). Thus, the test name could be more understanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-11 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r377664031
 
 

 ##
 File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
 ##
 @@ -624,6 +624,46 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 assert(store.getRemoteBytes("list1").isEmpty)
   }
 
+  Seq(0, Int.MaxValue - 512).foreach { maxRemoteBlockSizeFetchToMem =>
+// Trying different values of maxRemoteBlockSizeFetchToMem to test both
+// non-streaming and streaming flows of Block replication
 
 Review comment:
   nit: `streaming` -> `stream`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix Block replication failure propogation issue in BlockManager

2020-02-11 Thread GitBox
Ngone51 commented on a change in pull request #27539: [SPARK-30786] [CORE] Fix 
Block replication failure propogation issue in BlockManager
URL: https://github.com/apache/spark/pull/27539#discussion_r377650369
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala
 ##
 @@ -105,7 +105,11 @@ class NettyBlockRpcServer(
 val blockId = BlockId(uploadBlock.blockId)
 logDebug(s"Receiving replicated block $blockId with level ${level} " +
   s"from ${client.getSocketAddress}")
-blockManager.putBlockData(blockId, data, level, classTag)
+val blockStored = blockManager.putBlockData(blockId, data, level, 
classTag)
+if (!blockStored) {
+  throw new Exception(s"Upload block for $blockId failed. This " +
+s"mostly happens when there is not sufficient space available to 
store the block.")
+}
 
 Review comment:
   > Should we use responseContext.onFailure instead of exception here?
   
   Yeah. I'd prefer to only use it here. But I don't expect to use it in 
`putBlockDataAsStream`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org