mridulm commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r658525245



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -73,27 +76,26 @@
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
-  @VisibleForTesting
-  static final String MERGE_MANAGER_DIR = "merge_manager";
+
   public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
 
-  private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
-  private final ConcurrentMap<AppShuffleId, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+  // A three level map to store the merged shuffle information with 
ApplicationID as the key

Review comment:
       While it boils down to being a three level map from point of view of 
getting to `AppShufflePartitionInfo` from here, `appsShuffleInfo` itself is 
just a one level map - replace comment with details of what is 
`appsShuffleInfo` instead ? Something along lines of All the ongoing shuffles 
for each application id ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -73,27 +76,26 @@
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {

Review comment:
       High level comment about changes in this class:
   
   * Query `AppShuffleInfo` only once for each public method using 
`validateAndGetAppShuffleInfo` (except for application removed - I have 
commented on that seperately) - everything else should be depending only on 
that returned instance.
     * None of the private methods should query `validateAndGetAppShuffleInfo`.
   
   *  Move `getFile`, `getMergedShuffleDataFile`, `getMergedShuffleIndexFile`, 
`getMergedShuffleMetaFile`, `generateFileName` into `AppShuffleInfo` - and use 
the appShuffleInfo from (a) to fetch these values.
   
   * Add `appId` as a `final` field of `AppShuffleInfo` - and do not pass 
`appId` into any of the methods - simply pass appShuffleInfo.
   
   * Also, see if `appId` can be removed from `AppShufflePartitionInfo` - given 
it will be in the containing `AppShuffleInfo` always.
   

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,8 +462,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
             // The partition should be removed after the files are written so 
that any new stream

Review comment:
       You can remove the `partitionsIter.remove()` below - since 
`shufflePartitions` has been removed from `appShuffleInfo.partitions` already.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
    */
   private File getFile(String appId, String filename) {
     // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File targetFile = 
ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs,
+      appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename);
     logger.debug("Get merged file {}", targetFile.getAbsolutePath());
     return targetFile;
   }
 
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
+  private File getMergedShuffleDataFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, fileName);
   }
 
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
+  private File getMergedShuffleIndexFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, indexName);
   }
 
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
+  private File getMergedShuffleMetaFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, metaName);
   }
 
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+        "application " + appId + " active local dirs list has not been updated 
" +
+        "by any executor registration");
     return activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    cleanupShufflePartitionInfo(appShuffleInfo);
+    appsShuffleInfo.remove(appId);
+    if (cleanupLocalDirs) {
+      Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
+      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+    }

Review comment:
       ```suggestion
       AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
       if (null != appShuffleInfo) {
         cleanupShufflePartitionInfo(appShuffleInfo);
         if (cleanupLocalDirs) {
           Path[] dirs = 
Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
             .map(dir -> Paths.get(dir)).toArray(Path[]::new);
           directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
         }
       }
   ```

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -338,51 +418,30 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
     final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != 
null
       && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
         : partitionInfoBeforeCheck;
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, 
msg.mapIndex,
-      msg.reduceId);
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(this, streamId, partitionInfo, 
msg.mapIndex);
     } else {
       // For a duplicate block or a block which is late, respond back with a 
callback that handles
       // them differently.
-      return new StreamCallbackWithID() {
-        @Override
-        public String getID() {
-          return streamId;
-        }
-
-        @Override
-        public void onData(String streamId, ByteBuffer buf) {
-          // Ignore the requests. It reaches here either when a request is 
received after the
-          // shuffle file is finalized or when a request is for a duplicate 
block.
-        }
-
-        @Override
-        public void onComplete(String streamId) {
-          if (isTooLate) {
-            // Throw an exception here so the block data is drained from 
channel and server
-            // responds RpcFailure to the client.
-            throw new RuntimeException(String.format("Block %s %s", streamId,
-              ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
-          }
-          // For duplicate block that is received before the shuffle merge 
finalizes, the
-          // server should respond success to the client.
-        }
-
-        @Override
-        public void onFailure(String streamId, Throwable cause) {
-        }
-      };
+      if (isTooLate) {
+        // Throw RuntimeException in client as of block arrives too late
+        return createCallbackForInvalidPushBlocks(
+          streamId, 
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX);
+      } else {
+        // Duplicate block received, no error message and exception will be 
thrown in client.
+        return createCallbackForInvalidPushBlocks(streamId, null);
+      }
     }
   }
 
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
-    logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions = 
partitions.get(appShuffleId);
+    logger.info("Finalizing shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      appShuffleInfo.partitions.get(msg.shuffleId);

Review comment:
       ```suggestion
       Map<Integer, AppShufflePartitionInfo> shufflePartitions = 
appShuffleInfo.partitions.remove(msg.shuffleId);
   ```
   
   The subsequent `remove` below is not needed.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
    */
   private File getFile(String appId, String filename) {
     // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File targetFile = 
ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs,
+      appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename);
     logger.debug("Get merged file {}", targetFile.getAbsolutePath());
     return targetFile;
   }
 
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
+  private File getMergedShuffleDataFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, fileName);
   }
 
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
+  private File getMergedShuffleIndexFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, indexName);
   }
 
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
+  private File getMergedShuffleMetaFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, metaName);
   }
 
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+        "application " + appId + " active local dirs list has not been updated 
" +
+        "by any executor registration");
     return activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    cleanupShufflePartitionInfo(appShuffleInfo);
+    appsShuffleInfo.remove(appId);
+    if (cleanupLocalDirs) {
+      Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
+      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+    }
+  }
+
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+   */
+  private void cleanupShufflePartitionInfo(AppShuffleInfo appShuffleInfo) {
+    Iterator<Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
+      appShuffleInfo.partitions.entrySet().iterator();
     while (iterator.hasNext()) {
-      Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = 
iterator.next();
-      AppShuffleId appShuffleId = entry.getKey();
-      if (appId.equals(appShuffleId.appId)) {
-        iterator.remove();
-        for (AppShufflePartitionInfo partitionInfo : 
entry.getValue().values()) {
+      Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>> entry = 
iterator.next();
+      for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) {
+        synchronized (partitionInfo) {
           partitionInfo.closeAllFiles();
         }
       }
-    }
-    if (cleanupLocalDirs) {
-      Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs)
-        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
-      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+      iterator.remove();

Review comment:
       You dont need to remove iterator - the entire map has been dscarded.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
    */
   private File getFile(String appId, String filename) {
     // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File targetFile = 
ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs,
+      appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename);
     logger.debug("Get merged file {}", targetFile.getAbsolutePath());
     return targetFile;
   }
 
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
+  private File getMergedShuffleDataFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, fileName);
   }
 
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
+  private File getMergedShuffleIndexFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, indexName);
   }
 
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
+  private File getMergedShuffleMetaFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, metaName);
   }
 
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+        "application " + appId + " active local dirs list has not been updated 
" +
+        "by any executor registration");
     return activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    cleanupShufflePartitionInfo(appShuffleInfo);
+    appsShuffleInfo.remove(appId);
+    if (cleanupLocalDirs) {
+      Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
+      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));

Review comment:
       Move this into `AppPathsInfo` as a `cleanupLocalDirs()` method?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -338,51 +418,30 @@ public StreamCallbackWithID 
receiveBlockDataAsStream(PushBlockStream msg) {
     final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != 
null
       && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
         : partitionInfoBeforeCheck;
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, 
msg.mapIndex,
-      msg.reduceId);
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(this, streamId, partitionInfo, 
msg.mapIndex);
     } else {
       // For a duplicate block or a block which is late, respond back with a 
callback that handles
       // them differently.
-      return new StreamCallbackWithID() {
-        @Override
-        public String getID() {
-          return streamId;
-        }
-
-        @Override
-        public void onData(String streamId, ByteBuffer buf) {
-          // Ignore the requests. It reaches here either when a request is 
received after the
-          // shuffle file is finalized or when a request is for a duplicate 
block.
-        }
-
-        @Override
-        public void onComplete(String streamId) {
-          if (isTooLate) {
-            // Throw an exception here so the block data is drained from 
channel and server
-            // responds RpcFailure to the client.
-            throw new RuntimeException(String.format("Block %s %s", streamId,
-              ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
-          }
-          // For duplicate block that is received before the shuffle merge 
finalizes, the
-          // server should respond success to the client.
-        }
-
-        @Override
-        public void onFailure(String streamId, Throwable cause) {
-        }
-      };
+      if (isTooLate) {
+        // Throw RuntimeException in client as of block arrives too late
+        return createCallbackForInvalidPushBlocks(
+          streamId, 
ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX);
+      } else {
+        // Duplicate block received, no error message and exception will be 
thrown in client.
+        return createCallbackForInvalidPushBlocks(streamId, null);
+      }
     }
   }
 
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
-    logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions = 
partitions.get(appShuffleId);
+    logger.info("Finalizing shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      appShuffleInfo.partitions.get(msg.shuffleId);
     MergeStatuses mergeStatuses;
     if (shufflePartitions == null || shufflePartitions.isEmpty()) {
       mergeStatuses =

Review comment:
       Replace the use of `partitionsIter` below with iterator over 
`partitionsToFinalize` (`for (AppShufflePartitionInfo partition: 
partitionsToFinalize)` )

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -112,34 +114,49 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
+  private AppShuffleInfo validateAndGetAppShuffleInfo(String appId) {
+    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+    AppShuffleInfo appShuffleInfo =
+      Preconditions.checkNotNull(appsShuffleInfo.get(appId),
+        "application " + appId + " is not registered or NM was restarted.");
+    return appShuffleInfo;
+  }
+
   /**
    * Given the appShuffleId and reduceId that uniquely identifies a given 
shuffle partition of an
    * application, retrieves the associated metadata. If not present and the 
corresponding merged
    * shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      String appId,
+      int shuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);

Review comment:
       Should not query `validateAndGetAppShuffleInfo` - pass `appShuffleInfo` 
as param.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -417,24 +476,75 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    appShuffleInfo.partitions.remove(msg.shuffleId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          AtomicReference<AppShuffleInfo> originalAppShuffleInfo = new 
AtomicReference<>();

Review comment:
       Make this a `ConcurrentHashMap<Integer, AppShuffleInfo>` (key == 
attemptId) - in theory, `compute` can invoke the closure multiple times - 
whenever there is an overlap in computation (in practice it should simply be 
zero or one attempt id getting replaced).

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -217,61 +237,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
    */
   private File getFile(String appId, String filename) {
     // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File targetFile = 
ExecutorDiskUtils.getFile(appShuffleInfo.appPathsInfo.activeLocalDirs,
+      appShuffleInfo.appPathsInfo.subDirsPerLocalDir, filename);
     logger.debug("Get merged file {}", targetFile.getAbsolutePath());
     return targetFile;
   }
 
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
+  private File getMergedShuffleDataFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, fileName);
   }
 
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
+  private File getMergedShuffleIndexFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, indexName);
   }
 
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
+  private File getMergedShuffleMetaFile(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+    String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, metaName);
   }
 
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+        "application " + appId + " active local dirs list has not been updated 
" +
+        "by any executor registration");
     return activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    cleanupShufflePartitionInfo(appShuffleInfo);
+    appsShuffleInfo.remove(appId);
+    if (cleanupLocalDirs) {
+      Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
+      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+    }
+  }
+
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+   */
+  private void cleanupShufflePartitionInfo(AppShuffleInfo appShuffleInfo) {
+    Iterator<Map.Entry<Integer, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
+      appShuffleInfo.partitions.entrySet().iterator();
     while (iterator.hasNext()) {

Review comment:
       ```suggestion
       for (Map<Integer, AppShufflePartitionInfo> partitionMap : 
appShuffleInfo.partitions.values()) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to