yabola commented on code in PR #38560:
URL: https://github.com/apache/spark/pull/38560#discussion_r1031346915
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -319,51 +319,68 @@ class BlockManagerMasterEndpoint(
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- val removeMsg = RemoveShuffle(shuffleId)
- val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]]
+ if (pushBasedShuffleEnabled && externalBlockStoreClient.isDefined) {
+ val shuffleClient = externalBlockStoreClient.get
+ mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+ case Some(shuffleStatus) =>
+ val shuffleMergerLocations =
shuffleStatus.getShufflePushMergerLocations
+ removeShuffleFromShuffleServicesFutures ++=
shuffleMergerLocations.map(bmId => {
+ Future[Boolean] {
+ shuffleClient.removeShuffleMerge(bmId.host, bmId.port, shuffleId)
+ }
+ })
+ case None =>
+ logWarning(s"Asked to remove merge shuffle blocks from " +
+ s"shuffle service for unknown shuffle ${shuffleId}")
}
- }.toSeq
+ }
- // Find all shuffle blocks on executors that are no longer running
- val blocksToDeleteByShuffleService =
- new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
if (externalShuffleServiceRemoveShuffleEnabled) {
- mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
- shuffleStatus.withMapStatuses { mapStatuses =>
- mapStatuses.foreach { mapStatus =>
- // Check if the executor has been deallocated
- if
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
- val blocksToDel =
-
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId,
mapStatus.mapId)
- if (blocksToDel.nonEmpty) {
- val blocks =
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
- new mutable.HashSet[BlockId])
- blocks ++= blocksToDel
+ val shuffleClient = externalBlockStoreClient.get
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDelete = new mutable.HashMap[BlockManagerId,
mutable.HashSet[BlockId]]
+ mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+ case Some(shuffleStatus) =>
+ shuffleStatus.withMapStatuses { mapStatuses =>
+ mapStatuses.foreach { mapStatus =>
+ // Check if the executor has been deallocated
+ if
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+ val blocksToDel = shuffleManager.shuffleBlockResolver
+ .getBlocksForShuffle(shuffleId, mapStatus.mapId)
+ if (blocksToDel.nonEmpty) {
+ val blocks =
blocksToDelete.getOrElseUpdate(mapStatus.location,
+ new mutable.HashSet[BlockId])
+ blocks ++= blocksToDel
+ }
}
}
}
- }
+ removeShuffleFromShuffleServicesFutures ++= blocksToDelete.map {
case (bmId, blockIds) =>
+ Future[Boolean] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
+ TimeUnit.SECONDS) == blockIds.size
+ }
+ }.toSeq
+ case None =>
+ logDebug(s"Asked to remove shuffle blocks from " +
+ s"shuffle service for unknown shuffle ${shuffleId}")
}
}
-
- val removeShuffleFromShuffleServicesFutures =
- externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Boolean] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
- TimeUnit.SECONDS) == blockIds.size
- }
- }
- }.getOrElse(Seq.empty)
-
+ // It needs to be invoked at last to avoid cleaning up shuffleStatuses in
mapOutputTracker
+ // too early before used by [[removeShuffleFromShuffleServicesFutures]]
+ val removeMsg = RemoveShuffle(shuffleId)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
Review Comment:
I just move `removeShuffleFromExecutorsFutures` to the last.
It needs to be invoked at last to avoid cleaning up shuffleStatuses in
mapOutputTracker too early. Otherwise
`mapOutputTracker.shuffleStatuses.get(shuffleId)` may be none sometimes
Please refer to [unregisterShuffle
codes](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala#L59)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]