Victsm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r638422896



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -415,24 +429,65 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    partitions.remove(appAttemptShuffleId);
+    logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId, 
msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+          executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo, 
MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == -1) {
+          // When attemptId is 0, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsPathsInfo.computeIfAbsent(appId, id ->
+            new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId, 
executorInfo.localDirs,
+              mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt will
+          // register the merge dirs in Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt.
+          // Former attempt's shuffle partitions information will also be 
cleaned up.
+          boolean newAttemptRegistered = false;
+          if (appsPathsInfo.containsKey(appId)
+              && mergeDirectoryMeta.attemptId > 
appsPathsInfo.get(appId).attemptId) {
+            newAttemptRegistered = true;
+          }
+          appsPathsInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+            if (appAttemptPathsInfo == null
+                || (appAttemptPathsInfo != null && 
mergeDirectoryMeta.attemptId > appAttemptPathsInfo.attemptId)) {
+              appAttemptPathsInfo = new AppAttemptPathsInfo(appId, 
mergeDirectoryMeta.attemptId,
+                executorInfo.localDirs, mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir);
+            }
+            return appAttemptPathsInfo;
+          });
+          // It is safe to clean up the AppShufflePartitionInfo
+          if (newAttemptRegistered) {
+            cleanupShufflePartitionInfo(appId, mergeDirectoryMeta.attemptId);
+          }
+        }
+      } catch (JsonProcessingException e ) {
+        logger.warn("Failed to get the merge directory information from 
ExecutorShuffleInfo: ", e);
+      }
+    } else {
+      logger.warn("ExecutorShuffleInfo does not have the expected merge 
directory information");
     }
-    appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, 
executorInfo.localDirs,
-      executorInfo.subDirsPerLocalDir));
   }
-  private static String generateFileName(AppShuffleId appShuffleId, int 
reduceId) {
-    return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, 
appShuffleId.shuffleId,
-      reduceId);
+
+  private static String generateFileName(String appId, int shuffleId, int 
reduceId) {
+    return String.format("shuffleMerged__%s_%d_%d", appId, shuffleId, 
reduceId);

Review comment:
       Should be "shuffleMerged_%s_%d_%d". 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to