zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640349087



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
 
   /**
    * The logic here is consistent with
-   * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+   * @see 
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
    */
   private File getFile(String appId, String filename) {
     // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
+    AppAttemptPathsInfo appAttemptPathsInfo = 
Preconditions.checkNotNull(appsPathsInfo.get(appId),
       "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
+    File targetFile = 
ExecutorDiskUtils.getFile(appAttemptPathsInfo.activeLocalDirs,
+      appAttemptPathsInfo.subDirsPerLocalDir, filename);
     logger.debug("Get merged file {}", targetFile.getAbsolutePath());
     return targetFile;
   }
 
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
+  private File getMergedShuffleDataFile(String appId, int shuffleId, int 
reduceId) {
+    String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, fileName);
   }
 
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
+  private File getMergedShuffleIndexFile(String appId, int shuffleId, int 
reduceId) {
+    String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, indexName);
   }
 
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
+  private File getMergedShuffleMetaFile(String appId, int shuffleId, int 
reduceId) {
+    String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+    return getFile(appId, metaName);
   }
 
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
+    AppAttemptPathsInfo appAttemptPathsInfo = 
Preconditions.checkNotNull(appsPathsInfo.get(appId),
       "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
+    String[] activeLocalDirs = 
Preconditions.checkNotNull(appAttemptPathsInfo.activeLocalDirs,
       "application " + appId
       + " active local dirs list has not been updated by any executor 
registration");
     return activeLocalDirs;
   }
 
-  @Override
-  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
-    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific application attempt.
+   * If attemptId is -1, it means to clean up all the AppShufflePartitionInfo 
from
+   * all the attempts. Otherwise, only the AppShufflePartitionInfo from the 
specific
+   * application attempt will be cleaned up.
+   */
+  private void cleanupShufflePartitionInfo(String appId, int attemptId) {
+    Iterator<Map.Entry<AppAttemptShuffleId, Map<Integer, 
AppAttemptShufflePartitionInfo>>> iterator =
       partitions.entrySet().iterator();
     while (iterator.hasNext()) {
-      Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = 
iterator.next();
-      AppShuffleId appShuffleId = entry.getKey();
-      if (appId.equals(appShuffleId.appId)) {
+      Map.Entry<AppAttemptShuffleId, Map<Integer, 
AppAttemptShufflePartitionInfo>> entry = iterator.next();
+      AppAttemptShuffleId appAttemptShuffleId = entry.getKey();
+      if (appId.equals(appAttemptShuffleId.appId)
+          && (attemptId == -1 || attemptId == appAttemptShuffleId.attemptId)) {
         iterator.remove();
-        for (AppShufflePartitionInfo partitionInfo : 
entry.getValue().values()) {
+        for (AppAttemptShufflePartitionInfo partitionInfo : 
entry.getValue().values()) {
           partitionInfo.closeAllFiles();
         }
       }
     }
+  }
+
+  @Override
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
+    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+    AppAttemptPathsInfo appAttemptPathsInfo = 
Preconditions.checkNotNull(appsPathsInfo.remove(appId),
+      "application " + appId + " is not registered or NM was restarted.");
+    cleanupShufflePartitionInfo(appId, -1);

Review comment:
       @mridulm FYI, I have not added the unit test for this memory leak yet in 
this update PR. Post the majority of the required change first and I will add 
it tomorrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to