yabola commented on code in PR #38560:
URL: https://github.com/apache/spark/pull/38560#discussion_r1030998613


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    // Find all shuffle blocks on executors that are no longer running
-    val blocksToDeleteByShuffleService =
-      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]]
     if (externalShuffleServiceRemoveShuffleEnabled) {
-      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
-        shuffleStatus.withMapStatuses { mapStatuses =>
-          mapStatuses.foreach { mapStatus =>
-            // Check if the executor has been deallocated
-            if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
-              val blocksToDel =
-                
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, 
mapStatus.mapId)
-              if (blocksToDel.nonEmpty) {
-                val blocks = 
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
-                  new mutable.HashSet[BlockId])
-                blocks ++= blocksToDel
+      val shuffleClient = externalBlockStoreClient.get
+      // Find all shuffle blocks on executors that are no longer running
+      val blocksToDelete = new mutable.HashMap[BlockManagerId, 
mutable.HashSet[BlockId]]
+      mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+        case Some(shuffleStatus) =>
+          shuffleStatus.withMapStatuses { mapStatuses =>
+            mapStatuses.foreach { mapStatus =>
+              // Check if the executor has been deallocated
+              if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+                val blocksToDel = shuffleManager.shuffleBlockResolver
+                  .getBlocksForShuffle(shuffleId, mapStatus.mapId)
+                if (blocksToDel.nonEmpty) {
+                  val blocks = 
blocksToDelete.getOrElseUpdate(mapStatus.location,
+                    new mutable.HashSet[BlockId])
+                  blocks ++= blocksToDel
+                }

Review Comment:
   > What if the shuffle statuses are not exists ?
   I think it will not, please see [case match 
codes](https://github.com/apache/spark/pull/38560/files#diff-21bb40987f7d21c8d3d3de3212f388c210fb04a636c96bdef53cab6070c6aa46R336)



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    // Find all shuffle blocks on executors that are no longer running
-    val blocksToDeleteByShuffleService =
-      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]]
     if (externalShuffleServiceRemoveShuffleEnabled) {
-      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
-        shuffleStatus.withMapStatuses { mapStatuses =>
-          mapStatuses.foreach { mapStatus =>
-            // Check if the executor has been deallocated
-            if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
-              val blocksToDel =
-                
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId, 
mapStatus.mapId)
-              if (blocksToDel.nonEmpty) {
-                val blocks = 
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
-                  new mutable.HashSet[BlockId])
-                blocks ++= blocksToDel
+      val shuffleClient = externalBlockStoreClient.get
+      // Find all shuffle blocks on executors that are no longer running
+      val blocksToDelete = new mutable.HashMap[BlockManagerId, 
mutable.HashSet[BlockId]]
+      mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+        case Some(shuffleStatus) =>
+          shuffleStatus.withMapStatuses { mapStatuses =>
+            mapStatuses.foreach { mapStatus =>
+              // Check if the executor has been deallocated
+              if 
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+                val blocksToDel = shuffleManager.shuffleBlockResolver
+                  .getBlocksForShuffle(shuffleId, mapStatus.mapId)
+                if (blocksToDel.nonEmpty) {
+                  val blocks = 
blocksToDelete.getOrElseUpdate(mapStatus.location,
+                    new mutable.HashSet[BlockId])
+                  blocks ++= blocksToDel
+                }

Review Comment:
   > What if the shuffle statuses are not exists ?
   
   I think it will not, please see [case match 
codes](https://github.com/apache/spark/pull/38560/files#diff-21bb40987f7d21c8d3d3de3212f388c210fb04a636c96bdef53cab6070c6aa46R336)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to