zhouyejoe commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r663581955



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =
+            Maps.newConcurrentMap();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || (appShuffleInfo != null
+              && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) {
+              
appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, 
appShuffleInfo);
+              appShuffleInfo =
+                new AppShuffleInfo(
+                  appId, mergeDirectoryMeta.attemptId,
+                  new AppPathsInfo(appId, executorInfo.localDirs,
+                    mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir));
+            }
+            return appShuffleInfo;
+          });
+          for (AppShuffleInfo appShuffleInfo: 
appShuffleInfoToBeCleanedUp.values()) {

Review comment:
       It is related to the description of the 
[compute](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentMap.html#compute-K-java.util.function.BiFunction-)
 method for ConcurrentMap. ConcurrentMap is an interface, and if we take a look 
at how the 
[compute](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentSkipListMap.html#compute-K-java.util.function.BiFunction-)
 in ConcurrentSkipListMap is implemented, we can find that it is implemented 
with a for(;;) loop, where it finally calls UNSAFE.compareAndSwapObject(this, 
valueOffset, cmp, val). When updating the original value(origin_value) with the 
new value generated from the remapping function, it need to check if the 
current value in the (key, value) is the same with the origin_value recorded 
when the compute operation starts. If not, the for loop will start the next 
iteration and try updating the value again. In the case mentioned by Mridul 
above, (existing value 0) with registration attem
 pts 1, 2, 3 interleaving, for thread which processed attempt 3, the compute 
can be triggered three times, where it will cleanup the former attempts 0, 1, 
2. But this will also show that attempt 1 and 2 has been successfully 
registered in their own compute for loop. I think we can still use a single 
AppShuffleInfo here to store the one to be cleaned up. If we use a single 
AppShuffleInfo to record the one to be cleaned up, when attemp 1 gets 
registered, it will cleanup the stuff for attempt 0. And when attempt 2 gets 
registered, it will cleanup the stuff for attempt 1. There won't be cases that 
either attempt 1 or attempt 0 cleanup will be missed. If attempt 1 fails to get 
registered because attempt 2 updates the value attempt 0 first, there is no 
need to cleanup the stuff for attempt 1.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to