holdenk commented on a change in pull request #28924:
URL: https://github.com/apache/spark/pull/28924#discussion_r447303794



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -95,6 +97,13 @@ class BlockManagerMasterEndpoint(
   private val externalShuffleServiceRddFetchEnabled: Boolean = 
externalBlockStoreClient.isDefined
   private val externalShuffleServicePort: Int = 
StorageUtils.externalShuffleServicePort(conf)
 
+  private lazy val driverEndpoint = {

Review comment:
       Would `makeDriverRef` in `RpcUtils` be appropriate here?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -168,6 +177,37 @@ class BlockManagerMasterEndpoint(
       stop()
   }
 
+  private def handleFailure[T](

Review comment:
       I think this function could use a docstring.

##########
File path: core/src/main/scala/org/apache/spark/util/RpcUtils.scala
##########
@@ -54,6 +56,12 @@ private[spark] object RpcUtils {
     RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
   }
 
+  /**
+   * Infinite timeout is used internally, so there's no actual timeout 
property controls it.
+   * And timeout property should never be accessed since infinite means we 
never timeout.

Review comment:
       I'm not sure I follow this sentence correctly, can you try and reword it?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -350,11 +388,13 @@ class BlockManagerMasterEndpoint(
     if (locations != null) {
       locations.foreach { blockManagerId: BlockManagerId =>
         val blockManager = blockManagerInfo.get(blockManagerId)
-        if (blockManager.isDefined) {
+        blockManager.foreach { bm =>
           // Remove the block from the slave's BlockManager.
           // Doesn't actually wait for a confirmation and the message might 
get lost.
           // If message loss becomes frequent, we should add retry logic here.
-          blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId))
+          bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover {
+            handleFailure("block", blockId.toString, bm.blockManagerId, false)

Review comment:
       same comment as before.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -177,6 +180,95 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     blockManager.stop()
   }
 
+  private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = {
+    // set up a simple DriverEndpoint which simply adds executorIds and
+    // check whether a certain executorId has been added before.

Review comment:
       nit:s/check/cheks/

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -235,7 +273,9 @@ class BlockManagerMasterEndpoint(
     val removeMsg = RemoveShuffle(shuffleId)
     Future.sequence(
       blockManagerInfo.values.map { bm =>
-        bm.slaveEndpoint.ask[Boolean](removeMsg)
+        bm.slaveEndpoint.ask[Boolean](removeMsg).recover {
+          handleFailure("shuffle", shuffleId.toString, bm.blockManagerId, 
false)

Review comment:
       either add the comment as in the previous call, or pass by name here for 
clarity.

##########
File path: core/src/main/scala/org/apache/spark/util/RpcUtils.scala
##########
@@ -54,6 +56,12 @@ private[spark] object RpcUtils {
     RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
   }
 
+  /**
+   * Infinite timeout is used internally, so there's no actual timeout 
property controls it.
+   * And timeout property should never be accessed since infinite means we 
never timeout.
+   * */

Review comment:
       nit: `* */`, we use `*/` more commonly in spark.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -93,6 +94,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       .set(MEMORY_STORAGE_FRACTION, 0.999)
       .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")
       .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
+      .set(Network.RPC_ASK_TIMEOUT, "5s")

Review comment:
       Any particular reason why 5?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to