mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1057911122
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+ return null;
+ } else {
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ }
+ }
+ boolean deleteAllMergedShuffle =
Review Comment:
nit: `deleteAllMergedShuffle ` -> `deleteCurrentMergedShuffle`
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+ return null;
+ } else {
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ }
+ }
+ boolean deleteAllMergedShuffle =
+ msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+ msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
Review Comment:
This is `mergePartitionsInfo` itself - replace
`currentAppAttemptShuffleMergeId` with it ? (or rename `mergePartitionsInfo` as
`currentAppAttemptShuffleMergeId` if we are going for clarity ?)
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+ return null;
+ } else {
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ }
+ }
+ boolean deleteAllMergedShuffle =
+ msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+ msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
Review Comment:
Note on the DB updates: we dont need to update the DB if shuffle was
finalized and merge id is not changing - `mergePartitionsInfo.isFinalized() &&
shuffleMergeId == mergePartitionsInfo.shuffleMergeId`
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ boolean deleteCurrent =
+ msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+ if(deleteCurrent) {
+ // request to clean up shuffle we are currently hosting
+ if (!mergePartitionsInfo.isFinalized()) {
+ submitCleanupTask(() -> {
+ closeAndDeleteOutdatedPartitions(
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions);
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+ });
+ } else {
+ submitCleanupTask(() -> {
+ deleteMergedFiles(currentAppAttemptShuffleMergeId,
+ mergePartitionsInfo.getReduceIds());
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+ mergePartitionsInfo.setReduceIds(new int[0]);
+ });
+ }
+ } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+ throw new RuntimeException(String.format("Asked to remove old shuffle
merged data for " +
+ "application %s shuffleId %s shuffleMergeId %s, but current
shuffleMergeId %s ",
+ msg.appId, msg.shuffleId, msg.shuffleMergeId,
mergePartitionsInfo.shuffleMergeId));
+ } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+ // cleanup request for newer shuffle - remove the outdated data we
have.
+ submitCleanupTask(() -> {
+ closeAndDeleteOutdatedPartitions(
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions);
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
Review Comment:
Can you elaborate on what the concurrency issue is ?
The expectation is to keep DB consistent with `appShuffleInfo.shuffles`, so
that during recovery there is a consistent view for shuffle service. The files
on disk can be lazily cleared up - subsequent requests wont be able to get to
them since they are no longer accessible via metadata.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ boolean deleteCurrent =
+ msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+ if(deleteCurrent) {
+ // request to clean up shuffle we are currently hosting
+ if (!mergePartitionsInfo.isFinalized()) {
+ submitCleanupTask(() -> {
+ closeAndDeleteOutdatedPartitions(
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions);
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+ });
+ } else {
+ submitCleanupTask(() -> {
+ deleteMergedFiles(currentAppAttemptShuffleMergeId,
+ mergePartitionsInfo.getReduceIds());
Review Comment:
Resolving conversation, please reopen if there is something additionally
missing.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
});
}
+ void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId,
int[] reduceIds) {
+ removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+ AppShuffleInfo appShuffleInfo =
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+ int shuffleId = appAttemptShuffleMergeId.shuffleId;
+ int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+ for (int reduceId : reduceIds) {
+ try {
+ File dataFile =
+ appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId,
reduceId);
+ dataFile.delete();
+ } catch (Exception e) {
Review Comment:
This is a best effort deferred task, which can conflict with concurrent
cleanup (like app termination for example) - so a number of failures is
expected.
For busy clusters, a warn message will quickly overwhelm the logs.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(RemoveShuffleMerge msg) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+ if (appShuffleInfo.attemptId != msg.appAttemptId) {
+ throw new IllegalArgumentException(
+ String.format("The attempt id %s in this RemoveShuffleMerge message
does not match "
+ + "with the current attempt id %s stored in shuffle service
for application %s",
+ msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+ }
+ appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
+ if (mergePartitionsInfo == null) {
+ if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+ return null;
+ } else {
+ writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+ return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+ }
+ }
+ boolean deleteAllMergedShuffle =
+ msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+ msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+ int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+ msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+ AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+ new AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId,
mergePartitionsInfo.shuffleMergeId);
+ AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
+ msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+ if(deleteAllMergedShuffle) {
+ // request to clean up shuffle we are currently hosting
+ if (!mergePartitionsInfo.isFinalized()) {
+ submitCleanupTask(() -> {
+ closeAndDeleteOutdatedPartitions(
+ currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions);
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+ });
+ } else {
+ submitCleanupTask(() -> {
+ deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+ mergePartitionsInfo.getReduceIds());
+ writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+ mergePartitionsInfo.setReduceIds(new int[0]);
Review Comment:
`setReduceIds` is not required as `mergePartitionsInfo` is no longer
accessible.
Also, same as below, `writeAppAttemptShuffleMergeInfoToDB` should be outside
the cleanup task.
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
"finalizing shuffle partition {}", msg.appId,
msg.appAttemptId, msg.shuffleId,
msg.shuffleMergeId, partition.reduceId);
} finally {
- partition.closeAllFilesAndDeleteIfNeeded(false);
+ Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+ partition.closeAllFilesAndDeleteIfNeeded(deleteFile);
Review Comment:
Please revert this for the time being, and revisit it as a follow up as
required.
There are a number of edge cases which needs to be handled in this important
PR, and I dont want to add to them with optional changes with minor utility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]