mridulm commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r659421677



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -112,34 +116,48 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
+  private AppShuffleInfo validateAndGetAppShuffleInfo(String appId) {
+    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+    AppShuffleInfo appShuffleInfo =
+      Preconditions.checkNotNull(appsShuffleInfo.get(appId),
+        "application " + appId + " is not registered or NM was restarted.");
+    return appShuffleInfo;
+  }
+
   /**
    * Given the appShuffleId and reduceId that uniquely identifies a given 
shuffle partition of an
    * application, retrieves the associated metadata. If not present and the 
corresponding merged
    * shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppShuffleInfo appShuffleInfo,
+      int shuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
+    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.partitions;
+    if (!partitions.containsKey(shuffleId) && dataFile.exists()) {
       // If this partition is already finalized then the partitions map will 
not contain
       // the appShuffleId but the data file would exist. In that case the 
block is considered late.
       return null;
     }

Review comment:
       nit: Do we want to combine the validation in a thread safe way ?
   
   ```
       Map<Integer, AppShufflePartitionInfo> shufflePartitions =
               partitions.compute(shuffleId, (id, map) -> {
                 if (null == map) {
                   // If this partition is already finalized then the 
partitions map will not contain
                   // the appShuffleId but the data file would exist. In that 
case the block is considered late.
                   if (dataFile.exists()) {
                     return null;
                   }
                   return Maps.newConcurrentMap();
                 } else {
                   return map;
                 }
               });
   
       if (null == shufflePartitions) {
         return null;
       }
   ```
   
   In my opinion, I dont think this is buying us much ... but wanted to suggest 
anyway, since it is a stricter check.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -403,38 +394,78 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.attemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
       mergeStatuses = new MergeStatuses(msg.shuffleId,
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta =
+          mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, mergeDirectoryMeta.attemptId,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          ConcurrentMap<Integer, AppShuffleInfo> appShuffleInfoToBeCleanedUp =
+            Maps.newConcurrentMap();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || (appShuffleInfo != null
+              && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) {
+              
appShuffleInfoToBeCleanedUp.putIfAbsent(appShuffleInfo.attemptId, 
appShuffleInfo);

Review comment:
       `putIfAbsent` only if `appShuffleInfo != null` ? This should have caused 
NPE with multiple attempts.
   We need a test for multiple attempt.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to