otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r633726943



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -744,10 +771,12 @@ AppShufflePartitionInfo getPartitionInfo() {
    */
   public static class AppShuffleId {
     public final String appId;
+    public final int attemptId;
     public final int shuffleId;
 
-    AppShuffleId(String appId, int shuffleId) {
+    AppShuffleId(String appId, int attemptId, int shuffleId) {

Review comment:
       I think we don't need to change this instead we can maintain another map 
with appId and latest attempt information. It would be best if we keep only 
latest partition infos in the `partitions` map and remove any entries with old 
attempts.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
##########
@@ -30,12 +30,15 @@
  */
 public class FinalizeShuffleMerge extends BlockTransferMessage {
   public final String appId;
+  public final int attemptId;

Review comment:
       FinalizeShuffleMessage is sent from the driver to the shuffle services. 
Is it possible for the driver of a previous attempt to be alive when the driver 
of a new attempt is running? 
   I think whenever the server receives `FinalizeShuffeMerge` it should always 
assume it is from the latest attempt

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,7 +116,7 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleId appShuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
+    File dataFile = getMergedShuffleDataFile(appShuffleId.appId, 
appShuffleId.shuffleId, reduceId);

Review comment:
       When the server receives a pushBlock message then there are 2 cases:
   1. It belongs to previous attempt: in this case it needs to ignore that 
message. If we don't then it will corrupt the file here.
   2. It belongs to latest attempt: in this case we should continue merging

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -414,23 +411,53 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc
         Longs.toArray(sizes));
     }
     partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.", msg.shuffleId, 
msg.appId, msg.attemptId);
     return mergeStatuses;
   }
 
   @Override
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+          executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(":")) {
+      String mergeDirInfo = 
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        MergeDirectoryMeta mergeDirectoryMeta = mapper.readValue(mergeDirInfo, 
MergeDirectoryMeta.class);
+        if (mergeDirectoryMeta.attemptId == 0) {
+          // When attemptId is 0, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsPathInfo.computeIfAbsent(appId, id ->
+            new AppAttemptPathsInfo(appId, mergeDirectoryMeta.attemptId, 
executorInfo.localDirs,
+              mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir));
+        } else {
+          // If attemptId is not 0, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt will
+          // register the merge dirs in Shuffle Service. Any later 
ExecutorRegister message
+          // won't override the merge dirs. But it can be overridden by 
ExecutorRegister
+          // message from new app attempts.
+          appsPathInfo.compute(appId, (id, appAttemptPathsInfo) -> {
+            if (appAttemptPathsInfo != null && mergeDirectoryMeta.attemptId > 
appAttemptPathsInfo.attemptId) {
+              appAttemptPathsInfo = new AppAttemptPathsInfo(appId, 
mergeDirectoryMeta.attemptId,
+                executorInfo.localDirs, mergeDirectoryMeta.mergeDir, 
executorInfo.subDirsPerLocalDir);
+            }
+            return appAttemptPathsInfo;

Review comment:
       We are not removing entries from `partitions` here? 
   If we don't maintain just active partitionsInfo in the `partitions` map then 
it will add complexity while saving in level db. Also these entries are not 
getting removed in the finalization so that just means it means that they will 
still in memory until the app is removed but they don't have any use.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -991,22 +1023,26 @@ int getNumIOExceptions() {
   /**
    * Wraps all the information related to the merge directory of an 
application.
    */
-  private static class AppPathsInfo {
+  private static class AppAttemptPathsInfo {
 
+    private final int attemptId;
     private final String[] activeLocalDirs;
     private final int subDirsPerLocalDir;
 
-    private AppPathsInfo(
+    private AppAttemptPathsInfo(
         String appId,
+        int attemptId,

Review comment:
       We can avoid this by maintaining another map maintaining latest attempts 
of all apps. Or if you want to include `attemptId` in this class then maybe 
change the name of this class to `AppMergedMeta` or likewise and modify the 
description 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to