attilapiros commented on a change in pull request #24499: [SPARK-27677][Core] 
Serve local disk persisted blocks by the external service after releasing 
executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r285769161
 
 

 ##########
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##########
 @@ -149,29 +162,70 @@ class BlockManagerMasterEndpoint(
     // First remove the metadata for the given RDD, and then asynchronously 
remove the blocks
     // from the slaves.
 
+    // The message sent to the slaves to remove the RDD
+    val removeMsg = RemoveRdd(rddId)
+
     // Find all blocks for the given RDD, remove the block from both 
blockLocations and
-    // the blockManagerInfo that is tracking the blocks.
+    // the blockManagerInfo that is tracking the blocks and create the futures 
which asynchronously
+    // remove the blocks from slaves and gives back the number of removed 
blocks
     val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId 
== rddId)
+    val blocksToDeleteByShuffleService =
+      new mutable.HashMap[BlockManagerId, mutable.HashSet[RDDBlockId]]
+
     blocks.foreach { blockId =>
-      val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
-      bms.foreach(bm => 
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
-      blockLocations.remove(blockId)
+      val bms: mutable.HashSet[BlockManagerId] = blockLocations.remove(blockId)
+
+      val (bmIdExtShuffle, bmIdExecutor) = bms.partition(_.port == 
externalShuffleServicePort)
+      if (bmIdExecutor.isEmpty && bmIdExtShuffle.nonEmpty) {
 
 Review comment:
   Very good catch I have missed this. I can extend the 
`blockStatusByShuffleService` with a flag (stored for each blockId) indicating 
the cleanup must be done via the shuffle service and update this info when the 
executor is removed.
   
   This seams to me the right time to extract `JHashMap[BlockId, BlockStatus]` 
from 
   
   ~~~scala
   private val blockStatusByShuffleService =
       new mutable.HashMap[BlockManagerId, JHashMap[BlockId, BlockStatus]]
   ~~~
   
   Into a new class: like `ShuffleServiceBlockInfo`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to