wankunde commented on code in PR #38560:
URL: https://github.com/apache/spark/pull/38560#discussion_r1029992768
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -452,22 +489,69 @@ void removeAppShuffleInfoFromDB(AppShuffleInfo
appShuffleInfo) {
}
/**
- * Clean up all the AppShufflePartitionInfo and the finalized shuffle
partitions in DB for
- * a specific shuffleMergeId. This is done since there is a higher
shuffleMergeId request made
- * for a shuffleId, therefore clean up older shuffleMergeId partitions. The
cleanup will be
- * executed the mergedShuffleCleaner thread.
+ * Clean up the outdated finalized or unfinalized shuffle partitions. The
cleanup will be executed in the
+ * mergedShuffleCleaner thread. Two cases need to clean up the shuffle: 1.
there is a higher shuffleMergeId request
+ * made for a shuffleId, therefore clean up older shuffleMergeId partitions.
2. Application requires to clean up the
+ * expired or unused specific shuffleId partitions
*/
+ @VisibleForTesting
+ void submitRemoveShuffleMergeTask(
+ AppShuffleInfo shuffleInfo, Integer shuffleId,
+ Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+ AppShuffleMergePartitionsInfo mergePartitionsInfo =
shuffleInfo.shuffles.get(shuffleId);
+ AppAttemptShuffleMergeId shuffleMergeId = new AppAttemptShuffleMergeId(
+ shuffleInfo.appId, shuffleInfo.attemptId, shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ if (!mergePartitionsInfo.isFinalized()) {
+ Map<Integer, AppShufflePartitionInfo> partitionsToClean =
mergePartitionsInfo.shuffleMergePartitions;
+ submitCleanupTask(() ->
+ closeAndDeleteOutdatedPartitions(shuffleMergeId, partitionsToClean,
higherShuffleMergeId));
+ } else {
+ int[] partitionsToClean = mergePartitionsInfo.finalizedPartitions;
+ submitCleanupTask(() ->
+ deleteOutdatedFinalizedPartitions(shuffleInfo, shuffleMergeId,
partitionsToClean, higherShuffleMergeId));
+ }
+ }
+
@VisibleForTesting
void closeAndDeleteOutdatedPartitions(
- AppAttemptShuffleMergeId appAttemptShuffleMergeId,
- Map<Integer, AppShufflePartitionInfo> partitions) {
- removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+ AppAttemptShuffleMergeId shuffleMergeId,
+ Map<Integer, AppShufflePartitionInfo> partitions,
+ Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+ removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+ higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
partitions
- .forEach((partitionId, partitionInfo) -> {
- synchronized (partitionInfo) {
- partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
- }
- });
+ .forEach((partitionId, partitionInfo) -> {
+ synchronized (partitionInfo) {
+ partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+ }
+ });
+ }
+
+ @VisibleForTesting
+ void deleteOutdatedFinalizedPartitions(
+ AppShuffleInfo shuffleInfo,
+ AppAttemptShuffleMergeId shuffleMergeId,
+ int[] outdatedFinalizedPartitions,
+ Optional<AppAttemptShuffleMergeId> higherShuffleMergeId) {
+ int shuffleId = shuffleMergeId.shuffleId;
+ int mergeId = shuffleMergeId.shuffleMergeId;
+ removeAppShufflePartitionInfoFromDB(shuffleMergeId);
+ higherShuffleMergeId.ifPresent(this::removeAppShufflePartitionInfoFromDB);
+ Arrays.stream(outdatedFinalizedPartitions).forEach(partition -> {
+ try {
+ File metaFile =
+ shuffleInfo.getMergedShuffleMetaFile(shuffleId, mergeId,
partition);
+ File indexFile = new File(
+ shuffleInfo.getMergedShuffleIndexFilePath(shuffleId, mergeId,
partition));
+ File dataFile =
+ shuffleInfo.getMergedShuffleDataFile(shuffleId, mergeId,
partition);
+ metaFile.delete();
+ indexFile.delete();
+ dataFile.delete();
+ } catch (Exception e) {
+ logger.error("Error delete shuffle files for {}", shuffleMergeId, e);
+ }
Review Comment:
Just like `closeAllFilesAndDeleteIfNeeded` method, can we continue delete
the other files if one `delete()` failed?
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -712,6 +794,7 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
mergeStatuses = new MergeStatuses(msg.shuffleId, msg.shuffleMergeId,
bitmaps.toArray(new RoaringBitmap[bitmaps.size()]),
Ints.toArray(reduceIds),
Longs.toArray(sizes));
+
appShuffleInfo.shuffles.get(msg.shuffleId).setFinalizedPartitions(Ints.toArray(reduceIds));
Review Comment:
The `FinalizedPartitions` will be empty after the Shuffle service restart
which will cause the merged shuffle files leak.
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -327,43 +327,52 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- // Find all shuffle blocks on executors that are no longer running
- val blocksToDeleteByShuffleService =
- new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ var removeShuffleFromShuffleServicesFutures = Seq.empty[Future[Boolean]]
if (externalShuffleServiceRemoveShuffleEnabled) {
- mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
- shuffleStatus.withMapStatuses { mapStatuses =>
- mapStatuses.foreach { mapStatus =>
- // Check if the executor has been deallocated
- if
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
- val blocksToDel =
-
shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId,
mapStatus.mapId)
- if (blocksToDel.nonEmpty) {
- val blocks =
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
- new mutable.HashSet[BlockId])
- blocks ++= blocksToDel
+ val shuffleClient = externalBlockStoreClient.get
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDelete = new mutable.HashMap[BlockManagerId,
mutable.HashSet[BlockId]]
+ mapOutputTracker.shuffleStatuses.get(shuffleId) match {
+ case Some(shuffleStatus) =>
+ shuffleStatus.withMapStatuses { mapStatuses =>
+ mapStatuses.foreach { mapStatus =>
+ // Check if the executor has been deallocated
+ if
(!blockManagerIdByExecutor.contains(mapStatus.location.executorId)) {
+ val blocksToDel = shuffleManager.shuffleBlockResolver
+ .getBlocksForShuffle(shuffleId, mapStatus.mapId)
+ if (blocksToDel.nonEmpty) {
+ val blocks =
blocksToDelete.getOrElseUpdate(mapStatus.location,
+ new mutable.HashSet[BlockId])
+ blocks ++= blocksToDel
+ }
Review Comment:
What if the shuffle statuses are not exists ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]