otterc commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r660312202



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -294,10 +274,19 @@ void deleteExecutorDirs(Path[] dirs) {
 
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    final String streamId = String.format("%s_%d_%d_%d",
+      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.mapIndex,
+      msg.reduceId);
+    if (appShuffleInfo.attemptId != msg.attemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException("AttemptId does not match");

Review comment:
       Nit: can we say something like "App attemptId is not current 
{msg.attemptId} "

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -380,21 +366,26 @@ public void onFailure(String streamId, Throwable cause) {
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
-    logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions = 
partitions.get(appShuffleId);
+    logger.info("Finalizing shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.attemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException("AttemptId does not match");

Review comment:
       Nit: can we say something like "App attemptId is not current 
{msg.attemptId} "

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1020,6 +1019,81 @@ private AppPathsInfo(
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+      String appId,
+      int attemptId,
+      AppPathsInfo appPathsInfo
+    ) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.appPathsInfo = appPathsInfo;
+      partitions = Maps.newConcurrentMap();
+    }
+
+    /**
+     * The logic here is consistent with
+     * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+     *      org.apache.spark.storage.BlockId, scala.Option)]]
+     */
+    private File getFile(String filename) {
+      // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+      File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+        appPathsInfo.subDirsPerLocalDir, filename);
+      logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+      return targetFile;
+    }
+
+    private String generateFileName(
+      String appId,

Review comment:
       Nit: indentation

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1020,6 +1019,81 @@ private AppPathsInfo(
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+      String appId,
+      int attemptId,
+      AppPathsInfo appPathsInfo
+    ) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.appPathsInfo = appPathsInfo;
+      partitions = Maps.newConcurrentMap();
+    }
+
+    /**
+     * The logic here is consistent with
+     * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+     *      org.apache.spark.storage.BlockId, scala.Option)]]
+     */
+    private File getFile(String filename) {
+      // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+      File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+        appPathsInfo.subDirsPerLocalDir, filename);
+      logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+      return targetFile;
+    }
+
+    private String generateFileName(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+      return String.format(
+        "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, 
reduceId);
+    }
+
+    public File getMergedShuffleDataFile(
+        int shuffleId,
+        int reduceId) {
+      String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(fileName);
+    }
+
+    public File getMergedShuffleIndexFile(
+        int shuffleId,
+        int reduceId) {
+      String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(indexName);
+    }
+
+    public File getMergedShuffleMetaFile(
+      int shuffleId,
+      int reduceId) {
+      String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(metaName);
+    }
+
+    /**
+     * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+     */
+    public void cleanupShufflePartitionInfo() {

Review comment:
       Nit: This is closing all the shuffle files of all the partitions 
belonging to application. So can we  call it `closeAllShufflePartitions`?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1067,4 +1141,29 @@ long getPos() {
       return pos;
     }
   }
+
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private static class MergeDirectoryMeta {
+
+    public final String mergeDir;
+    public final int attemptId;
+
+    @JsonCreator
+    MergeDirectoryMeta(
+      @JsonProperty(value = "mergeDir", required = true) String mergeDir,

Review comment:
       Nit: indentation

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =
+            Maps.newConcurrentMap();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || (appShuffleInfo != null
+              && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) {
+              
appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, 
appShuffleInfo);
+              appShuffleInfo =
+                new AppShuffleInfo(
+                  appId, mergeDirectoryMeta.attemptId,
+                  new AppPathsInfo(appId, executorInfo.localDirs,
+                    mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir));
+            }
+            return appShuffleInfo;
+          });
+          for (AppShuffleInfo appShuffleInfo: 
appShuffleInfoToBeCleanedUp.values()) {
+            logger.info("Remove shuffle info for {}_{} as new application 
attempt registered",
+              appId, appShuffleInfo.attemptId);
+            appShuffleInfo.cleanupShufflePartitionInfo();

Review comment:
       Is there a UT that validates that old shuffle partitions are closed when 
a new attempt executor registers?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -567,7 +598,8 @@ public void onData(String streamId, ByteBuffer buf) throws 
IOException {
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          mergeManager.appsShuffleInfo.get(partitionInfo.appId).partitions

Review comment:
       What if an executor registers with a new attempt between  multiple 
`onData()` invocations on the same streamCallback? or between `onData()` and 
`onComplete()` on the streamCallback?
   
   Looks like `AppShufflePartitionInfo` should now also hold the reference to 
`shufflePartitions` instead of referencing `mergeManager.appShuffleInfo`.
   This needs a UT as well. We just need to trigger registerExecutor with new 
attempt between callbacks on the same stream.
    

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -644,10 +676,11 @@ public void onData(String streamId, ByteBuffer buf) 
throws IOException {
     public void onComplete(String streamId) throws IOException {
       synchronized (partitionInfo) {
         logger.trace("{} shuffleId {} reduceId {} onComplete invoked",
-          partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+          partitionInfo.appId, partitionInfo.shuffleId,
           partitionInfo.reduceId);
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          mergeManager.appsShuffleInfo.get(partitionInfo.appId).partitions

Review comment:
       Same as above

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -724,10 +757,11 @@ public void onFailure(String streamId, Throwable 
throwable) throws IOException {
       if (isWriting) {
         synchronized (partitionInfo) {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-            mergeManager.partitions.get(partitionInfo.appShuffleId);
+            mergeManager.appsShuffleInfo.get(partitionInfo.appId).partitions

Review comment:
       Same as above

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -530,10 +530,17 @@ private[spark] class BlockManager(
 
   private def registerWithExternalShuffleServer(): Unit = {
     logInfo("Registering executor with local external shuffle service.")
+    val shuffleManagerMeta =
+      if (Utils.isPushBasedShuffleEnabled(conf)) {
+        s"${shuffleManager.getClass.getName}:" +
+          s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
+      } else {
+        shuffleManager.getClass.getName
+      }

Review comment:
       Is there a UT that verifies the message when pushbased shuffle is 
enabled?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1020,6 +1019,81 @@ private AppPathsInfo(
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+      String appId,
+      int attemptId,
+      AppPathsInfo appPathsInfo
+    ) {

Review comment:
       Nit: style

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =

Review comment:
       Why is this a concurrent map?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1020,6 +1019,81 @@ private AppPathsInfo(
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+      String appId,

Review comment:
       Nit: Indentation

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =
+            Maps.newConcurrentMap();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || (appShuffleInfo != null

Review comment:
       `appShuffleInfo != null` is redundant in the second part:
   `(appShuffleInfo != null
                 && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)`
    When this part is evaluated, `appShuffleInfo` is always not null

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1003,14 +1001,15 @@ int getNumIOExceptions() {
     private AppPathsInfo(
         String appId,
         String[] localDirs,
+        String mergeDirectory,
         int subDirsPerLocalDir) {
       activeLocalDirs = Arrays.stream(localDirs)
         .map(localDir ->
           // Merge directory is created at the same level as block-manager 
directory. The list of
           // local directories that we get from executorShuffleInfo are paths 
of each
           // block-manager directory. To find out the merge directory 
location, we first find the
           // parent dir and then append the "merger_manager" directory to it.
-          
Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath())
+          
Paths.get(localDir).getParent().resolve(mergeDirectory).toFile().getPath())

Review comment:
       Please change the comment above it. It is stale

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1020,6 +1019,81 @@ private AppPathsInfo(
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+      String appId,
+      int attemptId,
+      AppPathsInfo appPathsInfo
+    ) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.appPathsInfo = appPathsInfo;
+      partitions = Maps.newConcurrentMap();
+    }
+
+    /**
+     * The logic here is consistent with
+     * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+     *      org.apache.spark.storage.BlockId, scala.Option)]]
+     */
+    private File getFile(String filename) {
+      // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+      File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+        appPathsInfo.subDirsPerLocalDir, filename);
+      logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+      return targetFile;
+    }
+
+    private String generateFileName(
+      String appId,
+      int shuffleId,
+      int reduceId) {
+      return String.format(
+        "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, 
reduceId);
+    }
+
+    public File getMergedShuffleDataFile(
+        int shuffleId,
+        int reduceId) {
+      String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(fileName);
+    }
+
+    public File getMergedShuffleIndexFile(
+        int shuffleId,
+        int reduceId) {
+      String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(indexName);
+    }
+
+    public File getMergedShuffleMetaFile(
+      int shuffleId,

Review comment:
       nit: indentation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to