holdenk commented on a change in pull request #28924: URL: https://github.com/apache/spark/pull/28924#discussion_r447303794
########## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ########## @@ -95,6 +97,13 @@ class BlockManagerMasterEndpoint( private val externalShuffleServiceRddFetchEnabled: Boolean = externalBlockStoreClient.isDefined private val externalShuffleServicePort: Int = StorageUtils.externalShuffleServicePort(conf) + private lazy val driverEndpoint = { Review comment: Would `makeDriverRef` in `RpcUtils` be appropriate here? ########## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ########## @@ -168,6 +177,37 @@ class BlockManagerMasterEndpoint( stop() } + private def handleFailure[T]( Review comment: I think this function could use a docstring. ########## File path: core/src/main/scala/org/apache/spark/util/RpcUtils.scala ########## @@ -54,6 +56,12 @@ private[spark] object RpcUtils { RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") } + /** + * Infinite timeout is used internally, so there's no actual timeout property controls it. + * And timeout property should never be accessed since infinite means we never timeout. Review comment: I'm not sure I follow this sentence correctly, can you try and reword it? ########## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ########## @@ -350,11 +388,13 @@ class BlockManagerMasterEndpoint( if (locations != null) { locations.foreach { blockManagerId: BlockManagerId => val blockManager = blockManagerInfo.get(blockManagerId) - if (blockManager.isDefined) { + blockManager.foreach { bm => // Remove the block from the slave's BlockManager. // Doesn't actually wait for a confirmation and the message might get lost. // If message loss becomes frequent, we should add retry logic here. - blockManager.get.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)) + bm.slaveEndpoint.ask[Boolean](RemoveBlock(blockId)).recover { + handleFailure("block", blockId.toString, bm.blockManagerId, false) Review comment: same comment as before. ########## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ########## @@ -177,6 +180,95 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE blockManager.stop() } + private def setupBlockManagerMasterWithBlocks(withLost: Boolean): Unit = { + // set up a simple DriverEndpoint which simply adds executorIds and + // check whether a certain executorId has been added before. Review comment: nit:s/check/cheks/ ########## File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ########## @@ -235,7 +273,9 @@ class BlockManagerMasterEndpoint( val removeMsg = RemoveShuffle(shuffleId) Future.sequence( blockManagerInfo.values.map { bm => - bm.slaveEndpoint.ask[Boolean](removeMsg) + bm.slaveEndpoint.ask[Boolean](removeMsg).recover { + handleFailure("shuffle", shuffleId.toString, bm.blockManagerId, false) Review comment: either add the comment as in the previous call, or pass by name here for clarity. ########## File path: core/src/main/scala/org/apache/spark/util/RpcUtils.scala ########## @@ -54,6 +56,12 @@ private[spark] object RpcUtils { RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") } + /** + * Infinite timeout is used internally, so there's no actual timeout property controls it. + * And timeout property should never be accessed since infinite means we never timeout. + * */ Review comment: nit: `* */`, we use `*/` more commonly in spark. ########## File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ########## @@ -93,6 +94,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(MEMORY_STORAGE_FRACTION, 0.999) .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) + .set(Network.RPC_ASK_TIMEOUT, "5s") Review comment: Any particular reason why 5? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org