mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990753031
##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean
cleanupLocalDirs) {
}
}
+ @Override
+ public void removeShuffleMerge(String appId, int shuffleId, int
shuffleMergeId) {
+ AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+ AppShuffleMergePartitionsInfo partitionsInfo =
appShuffleInfo.shuffles.remove(shuffleId);
Review Comment:
We should not remove it directly - the value within the map could be for a
different `shuffleMergeId` (newer for example).
Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
Rough sketch is:
```
public void removeShuffleMerge(FinalizeShuffleMerge msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// incoming request for different app attempt - exception
throw new IllegalArgumentException("appropriate msg for invalid app
attempt");
}
appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId,
mergePartitionsInfo) -> {
if (null != mergePartitionsInfo) {
// where DELETE_CURRENT == -1
// merge id will be set to -1 when we are cleaning up shuffle, and
there is no chance of its reuse -
// else it will be set to an explicit value.
boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
// Looks like there is a bug in finalizeShuffleMerge, let us fix it
here anyway
// and handle it for finalizeShuffleMerge in a different PR.
AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new
AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId,
msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
if (!deleteAny && msg.shuffleMergeId <
mergePartitionsInfo.shuffleMergeId) {
// throw exception - request for an older shuffle merge id
throw new RuntimeException("appropriate msg for delete of old
merge id");
} else if (!deleteAny && msg.shuffleMergeId >
mergePartitionsInfo.shuffleMergeId) {
// cleanup request for newer shuffle - remove the outdated data we
have.
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
} else {
// request to cleanup shuffle we are currently hosting
// Not yet finalized - use the existing cleanup mechanism
if (!mergePartitionsInfo.isFinalized()) {
submitCleanupTask(() ->
closeAndDeleteOutdatedPartitions(
currentAppAttemptShuffleMergeId,
mergePartitionsInfo.shuffleMergePartitions));
} else {
if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
// Introduce new method which deletes the files for
shuffleMergeId
submitCleanupTask(() ->
deleteMergedFiles(msg.appId, msg.appAttemptId,
msg.shuffleId, msg.shuffleMergeId,
// To be introduced - see below
mergePartitionsInfo.getReduceIds()));
}
// simply return existing entry immediately - db does not need
updating - we can actually
// drop reduce-ids here as an optimization
return mergePartitionsInfo;
}
}
}
// keep track of the latest merge id - and mark it as finalized and
immutable as already marked for deletion/deleted.
AppAttemptShuffleMergeId appAttemptShuffleMergeId = new
AppAttemptShuffleMergeId(
msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
// no reduceid's
return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
});
}
```
To write up `deleteMergedFiles`, the only thing missing is set of valid
reduce id's (`getReduceIds` above).
We can keep track of that by modifying `finalizeShuffleMerge` as follows:
a) keep reference to response from `appShuffleInfo.shuffles.compute()`
b) Before returning `mergeStatuses`, update this variable with `reduceIds`
c) for efficiency, we can convert it to a bitmap and save space - but that
is an impl detail.
Thoughts ?
+CC @otterc, @zhouyejoe - please take a look at the bug there should be
there in `finalizeShuffleMerge`, which I have sketched fix for above.
Unless I am missing something, we should fix `finalizeShuffleMerge` in a
seperate PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]