venkata91 commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r663595592



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -2225,4 +2225,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault(Nil)
+
+  private[spark] val APP_ATTEMPT_ID =
+    ConfigBuilder("spark.app.attempt.id")

Review comment:
       nit: This feels like it should be `spark.app.attemptNumber` than 
`spark.app.attempt.id`. Also currently this is specific to push based shuffle 
right? Should we add that in the documentation?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,76 +236,54 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
     }
   }
 
-  /**
-   * The logic here is consistent with
-   * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
-   *      org.apache.spark.storage.BlockId, scala.Option)]]
-   */
-  private File getFile(String appId, String filename) {
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
-    logger.debug("Get merged file {}", targetFile.getAbsolutePath());
-    return targetFile;
-  }
-
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
-  }
-
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
-  }
-
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
-  }
-
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,
+        "application " + appId + " active local dirs list has not been updated 
" +
+        "by any executor registration");
     return activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = 
iterator.next();
-      AppShuffleId appShuffleId = entry.getKey();
-      if (appId.equals(appShuffleId.appId)) {
-        iterator.remove();
-        for (AppShufflePartitionInfo partitionInfo : 
entry.getValue().values()) {
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    if (null != appShuffleInfo) {
+      mergedShuffleCleanerExecutor.execute(
+        () -> cleanupMergedShuffle(appShuffleInfo, cleanupLocalDirs));
+    }
+  }
+
+
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+   * If cleanupLocalDirs is true, the merged shuffle files will also be 
deleted.
+   * The cleanup will be executed in a separate thread.
+   */
+  private void cleanupMergedShuffle(

Review comment:
       nit: `closeAndDeletePartitionFilesIfNeeded`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to