mridulm commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r661786876



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =
+            Maps.newConcurrentMap();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || (appShuffleInfo != null
+              && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) {
+              
appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, 
appShuffleInfo);
+              appShuffleInfo =
+                new AppShuffleInfo(
+                  appId, mergeDirectoryMeta.attemptId,
+                  new AppPathsInfo(appId, executorInfo.localDirs,
+                    mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir));
+            }
+            return appShuffleInfo;
+          });
+          for (AppShuffleInfo appShuffleInfo: 
appShuffleInfoToBeCleanedUp.values()) {

Review comment:
       The change that @zhouyejoe will make to `ConcurrentHashMap` will 
simplify this analysis.
   For `ConcurrentMap`, the `remappingFunction` passed to `compute` can get 
invoked multiple times - while for `ConcurrentHashMap` it is more restrictive 
and can be invoked only once for a key.
   
   Given the current state (of `ConcurrentMap` instead of `ConcurrentHashMap` 
for `appsShuffleInfo`), I will try to elaborate the cases we have:
   
   * (a) There is no value for `appId` in `appsShuffleInfo` - and compute ends 
up introducing a new value.
     * This is the simplest case - essentially ESS is finding out about a new 
application/attempt. In this case `appShuffleInfoToBeCleanedUp` will be empty.
     
   * (b) There is an existing entry in `appsShuffleInfo` for the `appId` for 
the same `attemptId`.
     * In this case, `appShuffleInfoToBeCleanedUp` will be empty - and existing 
`appShuffleInfo` remains in `appsShuffleInfo`.
     
   * (c) Variant of above - where executor registration is for an older attempt 
id (due to some race in registrations/allocations).
     * In this case also, `appShuffleInfoToBeCleanedUp` will be empty - and 
existing `appShuffleInfo` remains in `appsShuffleInfo` (since it is for newer 
attempt).
     
   * (d) Variant of above - where executor registration is for a newer attempt 
id.
     * In this case, `appShuffleInfoToBeCleanedUp` will have an entry for the 
older attempt id - and new entry will be added to `appsShuffleInfo` (since it 
is for newer attempt).
   
   
   In case of `ConcurrentHashMap`, all variants of the above in any order or 
interleaving and whatever concurrency will result in need for a single 
AtomicReference as was done in initial version of PR 
[here](https://github.com/apache/spark/blob/65fca881bc8f2184e2e01b836ab2eaeee222e44f/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java#L515)
   
   This is because `remappingFunction` wont be called multiple times for an 
update, but only once.
   
   The issue we have with `ConcurrentMap` is that as `remappingFunction` can 
get invoked multiple times (in case map detects concurrent interleaving 
updates), we can have cases where multiple (d)'s can interleave with (c).
   For example:
   (existing value 0) with registeration attempts 1, 2, 3 interleaving (and 
applied in that order) will result in `appShuffleInfoToBeCleanedUp` having 
entries for 0, 1, 2.
   On other hand, for same case if it was applied in order of 3, 2, 1 - we will 
only have entry for 0 in `appShuffleInfoToBeCleanedUp`.
   
   Not sure if this makes sense - this became a much longer explaination than I 
originally intended !
   Hopefully, with the pending `ConcurrentHashMap` change, analysis will be 
simpler :-) Bottomline is, be very case when using `ConcurrentMap` instead of 
`ConcurrentHashMap` as the semantics of the compute* api's are not as strict.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to