attilapiros commented on a change in pull request #24499: [SPARK-27677][Core]
Serve local disk persisted blocks by the external service after releasing
executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r286907575
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -149,29 +162,70 @@ class BlockManagerMasterEndpoint(
// First remove the metadata for the given RDD, and then asynchronously
remove the blocks
// from the slaves.
+ // The message sent to the slaves to remove the RDD
+ val removeMsg = RemoveRdd(rddId)
+
// Find all blocks for the given RDD, remove the block from both
blockLocations and
- // the blockManagerInfo that is tracking the blocks.
+ // the blockManagerInfo that is tracking the blocks and create the futures
which asynchronously
+ // remove the blocks from slaves and gives back the number of removed
blocks
val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId
== rddId)
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]
+
blocks.foreach { blockId =>
- val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
- bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
- blockLocations.remove(blockId)
+ val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)
+
+ val (bmIdExtShuffle, bmIdExecutor) = bms.partition(_.port ==
externalShuffleServicePort)
+ if (bmIdExecutor.isEmpty && bmIdExtShuffle.nonEmpty) {
+ // when original executor is already removed use the shuffle service
to remove the blocks
+ bmIdExtShuffle.foreach { bmIdForShuffleService =>
+ val blockIdsToDel =
blocksToDeleteByShuffleService.getOrElseUpdate(bmIdForShuffleService,
+ new mutable.HashSet[RDDBlockId]())
+ blockIdsToDel += blockId
+ blockStatusByShuffleService.get(bmIdForShuffleService).foreach {
blockStatus =>
+ blockStatus.remove(blockId)
+ }
+ }
+ } else {
+ bmIdExecutor.foreach { bm =>
+ blockManagerInfo.get(bm).foreach { bmInfo =>
+ bmInfo.removeBlock(blockId)
+ }
+ }
+ }
}
+ val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
+ askRemoveRddFromExecutor(removeMsg, bmInfo)
+ }.toSeq
- // Ask the slaves to remove the RDD, and put the result in a sequence of
Futures.
- // The dispatcher is used as an implicit argument into the Future sequence
construction.
- val removeMsg = RemoveRdd(rddId)
-
- val futures = blockManagerInfo.values.map { bm =>
- bm.slaveEndpoint.ask[Int](removeMsg).recover {
- case e: IOException =>
- logWarning(s"Error trying to remove RDD $rddId from block manager
${bm.blockManagerId}",
- e)
- 0 // zero blocks were removed
+ val removeRddBlockViaExtShuffleServiceFutures = externalShuffleClient.map
{ shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }.toSeq
+ }.getOrElse(Seq.empty)
- Future.sequence(futures)
+ Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
+ }
+
+ /**
+ * Ask the slaves to remove the RDD.
+ */
+ private def askRemoveRddFromExecutor(
+ removeMsg: RemoveRdd,
+ bmInfo: BlockManagerInfo): Future[Int] = {
+ bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
+ case e: IOException =>
+ logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " +
+ s"from block manager ${bmInfo.blockManagerId}", e)
+ 0 // zero blocks were removed
Review comment:
Jira issue is created: https://issues.apache.org/jira/browse/SPARK-27819
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]