mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990753031


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int 
shuffleMergeId) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    AppShuffleMergePartitionsInfo partitionsInfo = 
appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   We should not remove it directly - the value within the map could be for a 
different `shuffleMergeId` (newer for example).
   Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
   
   Rough sketch is:
   
   ```
     public void removeShuffleMerge(FinalizeShuffleMerge msg) {
       AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
       if (appShuffleInfo.attemptId != msg.appAttemptId) {
         // incoming request for different app attempt - exception
         throw new IllegalArgumentException("appropriate msg for invalid app 
attempt");
       }
   
       appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
   
         if (null != mergePartitionsInfo) {
           // where DELETE_CURRENT == -1
           // merge id will be set to -1 when we are cleaning up shuffle, and 
there is no chance of its reuse -
           // else it will be set to an explicit value.
           boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
   
           // Looks like there is a bug in finalizeShuffleMerge, let us fix it 
here anyway
           // and handle it for finalizeShuffleMerge in a different PR.
           AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new 
AppAttemptShuffleMergeId(
                   msg.appId, msg.appAttemptId, 
                   msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
           if (!deleteAny && msg.shuffleMergeId < 
mergePartitionsInfo.shuffleMergeId) {
   
             // throw exception - request for an older shuffle merge id
             throw new RuntimeException("appropriate msg for delete of old 
merge id");
           } else if (!deleteAny && msg.shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId) {
   
             // cleanup request for newer shuffle - remove the outdated data we 
have.
             submitCleanupTask(() ->
                     closeAndDeleteOutdatedPartitions(
                             currentAppAttemptShuffleMergeId,
                             mergePartitionsInfo.shuffleMergePartitions));
           } else {
   
             // request to cleanup shuffle we are currently hosting
   
             // Not yet finalized - use the existing cleanup mechanism
             if (!mergePartitionsInfo.isFinalized()) {
               submitCleanupTask(() ->
                       closeAndDeleteOutdatedPartitions(
                               currentAppAttemptShuffleMergeId,
                               mergePartitionsInfo.shuffleMergePartitions));
             } else {
   
               if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
                 // Introduce new method which deletes the files for 
shuffleMergeId
                 submitCleanupTask(() ->
                       deleteMergedFiles(msg.appId, msg.appAttemptId, 
                                     msg.shuffleId, msg.shuffleMergeId, 
                                     // To be introduced - see below
                                     mergePartitionsInfo.getReduceIds()));
   
               }
               // simply return existing entry immediately - db does not need 
updating - we can actually 
               // drop reduce-ids here as an optimization
               return mergePartitionsInfo;
             }
           }
         }
         // keep track of the latest merge id - and mark it as finalized and 
immutable as already marked for deletion/deleted.
         AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
           msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
         writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
         // no reduceid's
         return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
       });
     }
   ```
   
   To write up `deleteMergedFiles`, the only thing missing is set of valid 
reduce id's (`getReduceIds` above).
   We can keep track of that by modifying `finalizeShuffleMerge` as follows:
   
   a) keep reference to response from `appShuffleInfo.shuffles.compute()`
   b) Before returning `mergeStatuses`, update this variable with `reduceIds`
   c) for efficiency, we can convert it to a bitmap and save space - but that 
is an impl detail.
   
   Thoughts ?
   
   +CC @otterc, @zhouyejoe - please take a look at the bug there should be 
there in `finalizeShuffleMerge`, which I have sketched fix for above.
   Unless I am missing something, we should fix `finalizeShuffleMerge` in a 
seperate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to