mridulm commented on a change in pull request #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r836934153



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,
+      String prefix) throws IOException {
+    if (!s.startsWith(prefix)) {

Review comment:
       nit: check for prefix followed by ';' ?
   Will also prevent potential `ArrayIndexOutOfBoundsException` in `substring` 
below.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,
+      String prefix) throws IOException {
+    if (!s.startsWith(prefix)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = s.substring(prefix.length() + 1);
+    return mapper.readValue(json, AppShufflePartitionId.class);
+  }
+
+  private static byte[] dbAppAttemptPathsKey(String appId, int attemptId) 
throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptIdJson = mapper.writeValueAsString(new 
AppAttemptId(appId, attemptId));
+    String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws 
IOException {
+    if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+      throw new IllegalArgumentException("expected a string starting with "
+        + APP_ATTEMPT_PATH_KEY_PREFIX);
+    }
+    String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+    return mapper.readValue(json, AppAttemptId.class);
+  }
+
+  public void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.debug("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleInfo(db);
+    reloadActiveAppAttemptsShufflePartitionInfo(db);

Review comment:
       `reloadAppShuffleInfo` is used for testing, not the others.
   Make `reloadAppShuffleInfo` package private with `@VisibleForTesting` and 
make other methods `private` ?
   If we need to open these up in future for testing, we can do so at that time.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,
+      String prefix) throws IOException {
+    if (!s.startsWith(prefix)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = s.substring(prefix.length() + 1);
+    return mapper.readValue(json, AppShufflePartitionId.class);
+  }
+
+  private static byte[] dbAppAttemptPathsKey(String appId, int attemptId) 
throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptIdJson = mapper.writeValueAsString(new 
AppAttemptId(appId, attemptId));
+    String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws 
IOException {
+    if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+      throw new IllegalArgumentException("expected a string starting with "
+        + APP_ATTEMPT_PATH_KEY_PREFIX);
+    }
+    String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+    return mapper.readValue(json, AppAttemptId.class);
+  }
+
+  public void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.debug("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleInfo(db);
+    reloadActiveAppAttemptsShufflePartitionInfo(db);
+  }
+
+  @VisibleForTesting
+  public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        try{
+          AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+          logger.debug("Reloading active application {}_{} merged shuffle 
files paths",
+            appAttemptId.appId, appAttemptId.attemptId);
+          appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+            new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo));
+        } catch (Exception exception) {
+          logger.error("Parsing exception is {}", exception);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void reloadFinalizedAppAttemptsShuffleInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppShufflePartitionId partitionId =
+          parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+            key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+        if (partitionId.reduceId != -1) {

Review comment:
       Why would reducerId not be -1 for 
`APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX` ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -536,9 +645,20 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         }
       }
       // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
-      // sent to the driver will be empty. This cam happen when the service 
didn't receive any
+      // sent to the driver will be empty. This can happen when the service 
didn't receive any
       // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
       // shuffle.
+      if (db != null) {

Review comment:
       Do this only for the last `else` block above.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -342,6 +404,30 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);
+  }
+
+  void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {

Review comment:
       nit: limit visibility to private if we dont need to expose it for 
testing/functionality.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -342,6 +404,30 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);
+  }
+
+  void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      try {
+        db.delete(dbAppAttemptPathsKey(appShuffleInfo.appId, 
appShuffleInfo.attemptId));
+        appShuffleInfo.shuffles
+          .forEach((shuffleId, shuffleInfo) -> 
shuffleInfo.shuffleMergePartitions
+            .forEach((shuffleMergeId, partitionInfo) -> {
+              synchronized (partitionInfo) {
+                if (shuffleInfo.isFinalized()) {
+                  cleanUpAppShufflePartitionInfoInDB(new AppShufflePartitionId(
+                    appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
+                      shuffleMergeId, -1));
+                }

Review comment:
       nit: this `if isFinalized` can be moved into 
`closeAndDeletePartitionFilesIfNeeded` and we can remove 
`cleanUpAppShuffleInfoInDB` ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,

Review comment:
       nit: `s` -> `key` ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -261,7 +321,8 @@ public MergedBlockMeta getMergedBlockMeta(
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
-    File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
shuffleMergeId, reduceId);
+    File metaFile =
+      appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);

Review comment:
       revert this and other whitespace changes to minimize diff ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -617,8 +738,8 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
           appsShuffleInfo.computeIfAbsent(appId, id ->
             new AppShuffleInfo(
               appId, UNDEFINED_ATTEMPT_ID,
-              new AppPathsInfo(appId, executorInfo.localDirs,
-                mergeDir, executorInfo.subDirsPerLocalDir)
+              new AppPathsInfo(appId, attemptId, executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir, db)

Review comment:
       Move the db update out of the constructor, and do it explicitly ?

##########
File path: 
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
##########
@@ -120,6 +120,7 @@
 
   private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
   private static final String SECRETS_RECOVERY_FILE_NAME = 
"sparkShuffleRecovery.ldb";
+  private static final String MERGE_MANAGER_FILE_NAME = "mergeManager.ldb";

Review comment:
       nit: rename to `sparkShuffleMergeRecovery` or some such to be more clear 
about the intent of the file ?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,
+      String prefix) throws IOException {
+    if (!s.startsWith(prefix)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = s.substring(prefix.length() + 1);
+    return mapper.readValue(json, AppShufflePartitionId.class);
+  }
+
+  private static byte[] dbAppAttemptPathsKey(String appId, int attemptId) 
throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptIdJson = mapper.writeValueAsString(new 
AppAttemptId(appId, attemptId));
+    String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws 
IOException {
+    if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+      throw new IllegalArgumentException("expected a string starting with "
+        + APP_ATTEMPT_PATH_KEY_PREFIX);
+    }
+    String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+    return mapper.readValue(json, AppAttemptId.class);
+  }
+
+  public void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.debug("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleInfo(db);
+    reloadActiveAppAttemptsShufflePartitionInfo(db);
+  }
+
+  @VisibleForTesting
+  public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        try{
+          AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+          logger.debug("Reloading active application {}_{} merged shuffle 
files paths",
+            appAttemptId.appId, appAttemptId.attemptId);
+          appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+            new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo));

Review comment:
       Instead of `computeIfAbsent`, use `compute` and ensure that the newer 
attempt id gets added (in case an earlier deletion did failed/etc).
   
   Same for `reloadFinalizedAppAttemptsShuffleInfo` below as well (w.r.t 
`shuffleMergeId`: there can be a race between `closeAndDeletePartitionFiles`, 
`finalizeShuffleMerge` and NM shutdown).
   
   I am assuming `reloadActiveAppAttemptsShufflePartitionInfo`, 
`reloadPartitionInfo`, etc will get deleted if we are only tracking finalized 
shuffles.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
       try {
-        return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, 
shuffleMergeId,
-          reduceId, dataFile, indexFile, metaFile);
+        AppShufflePartitionInfo appShufflePartitionInfo =
+            newAppShufflePartitionInfo(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+            shuffleId, shuffleMergeId, reduceId, dataFile, indexFile, 
metaFile);
+        if (db != null) {

Review comment:
       Why not track only those which have been successfully finalized ?
   That is, we dont track in progress pushes ... this should simply the code, 
remove all changes to `AppShufflePartitionInfo`, remove 
`APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX`, recovery,  etc.
   
   (Some of my other comments in this PR might not be relevant once this is 
done btw).

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
       try {
-        return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, 
shuffleMergeId,
-          reduceId, dataFile, indexFile, metaFile);
+        AppShufflePartitionInfo appShufflePartitionInfo =
+            newAppShufflePartitionInfo(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+            shuffleId, shuffleMergeId, reduceId, dataFile, indexFile, 
metaFile);
+        if (db != null) {
+          try{
+            byte[] dbKey =
+                dbAppAttemptShufflePartitionKeyWithCustomPrefix(

Review comment:
       nit: Add `getShufflePartitionDbKey` to be more clear ? (which delegates 
to `dbAppAttemptShufflePartitionKeyWithCustomPrefix` if required). The 
read/parse logic seems to be doing it with `parseDbApp*` methods already.
   
   Same for other uses of `dbAppAttemptShufflePartitionKeyWithCustomPrefix` as 
well.
   Localize all string/serde manipulations to a small set of methods
   

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;

Review comment:
       nit: pull the `;` as a constant

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,8 +252,23 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
       try {
-        return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, 
shuffleMergeId,
-          reduceId, dataFile, indexFile, metaFile);
+        AppShufflePartitionInfo appShufflePartitionInfo =
+            newAppShufflePartitionInfo(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+            shuffleId, shuffleMergeId, reduceId, dataFile, indexFile, 
metaFile);
+        if (db != null) {
+          try{
+            byte[] dbKey =
+                dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+                  appShufflePartitionInfo.appShufflePartitionId,
+                  APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX);
+            // We can reload a partition's state solely from the partition ID. 
Thus only the
+            // partition Ids are stored in levelDB.
+            db.put(dbKey, new byte[0]);
+          } catch (Exception e) {
+            logger.error("Error saving active app shuffle partition", e);
+          }
+        }

Review comment:
       We have a bunch of 
   ```
   if (db != null) { 
     try {
       byte[] key = generate key
       db.put(key, value)
     } catch (Exception ex) {
       log message
     }
   }
   ```
   sprinkled across this class.
   
   Refactor these and move them to util methods ? So that the main flow is 
clearer ?
   
   ```suggestion
     writeShufflePartitionToDb(appShufflePartitionInfo.appShufflePartitionId)
   ```

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -355,8 +441,31 @@ void closeAndDeletePartitionFiles(Map<Integer, 
AppShufflePartitionInfo> partitio
       .forEach((partitionId, partitionInfo) -> {
         synchronized (partitionInfo) {
           partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+          
cleanUpAppShufflePartitionInfoInDB(partitionInfo.appShufflePartitionId);
         }
       });
+
+  }
+
+  void cleanUpAppShufflePartitionInfoInDB(AppShufflePartitionId 
appShufflePartitionId) {
+    if (db != null) {
+      try {
+        if (appShufflePartitionId.reduceId == -1) {
+          db.delete(dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+            appShufflePartitionId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX));
+        } else {
+          db.delete(dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+            appShufflePartitionId, APP_ATTEMPT_SHUFFLE_PARTITION_KEY_PREFIX));
+        }
+      } catch (IOException e) {
+        logger.error("Error deleting {}_{} shuffleMergeId {} shuffle {}"
+                + "reduce {} from application shuffle merged partition info 
db",
+            appShufflePartitionId.appId, appShufflePartitionId.attemptId,
+            appShufflePartitionId.shuffleMergeId,
+            appShufflePartitionId.shuffleId,

Review comment:
       `shuffleId` before `shuffleMergeId` (here and in `updateChunkInfo` - not 
sure if elsewhere as well).
   Also, use `toString` in `AppShufflePartitionId` instead (here and elsewhere 
we are logging) ?

##########
File path: 
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
##########
@@ -296,7 +307,8 @@ static MergedShuffleFileManager 
newMergedShuffleFileManagerInstance(TransportCon
         mergeManagerImplClazz.asSubclass(MergedShuffleFileManager.class);
       // The assumption is that all the custom implementations just like the 
RemoteBlockPushResolver
       // will also need the transport configuration.
-      return 
mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
+      return mergeManagerSubClazz.getConstructor(TransportConf.class, 
File.class)
+        .newInstance(conf, mergeManagerFile);

Review comment:
       Modify `NoOpMergedShuffleFileManager` to also support this constructor - 
check other implementations as well if relevant.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -655,6 +776,238 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private static byte[] dbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      AppShufflePartitionId id,
+      String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptShufflePartitionJson = mapper.writeValueAsString(id);
+    String key = prefix + ";" + appAttemptShufflePartitionJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppShufflePartitionId 
parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+      String s,
+      String prefix) throws IOException {
+    if (!s.startsWith(prefix)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = s.substring(prefix.length() + 1);
+    return mapper.readValue(json, AppShufflePartitionId.class);
+  }
+
+  private static byte[] dbAppAttemptPathsKey(String appId, int attemptId) 
throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String appAttemptIdJson = mapper.writeValueAsString(new 
AppAttemptId(appId, attemptId));
+    String key = APP_ATTEMPT_PATH_KEY_PREFIX + ";" + appAttemptIdJson;
+    return key.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private static AppAttemptId parseDbAppAttemptPathsKey(String s) throws 
IOException {
+    if (!s.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+      throw new IllegalArgumentException("expected a string starting with "
+        + APP_ATTEMPT_PATH_KEY_PREFIX);
+    }
+    String json = s.substring(APP_ATTEMPT_PATH_KEY_PREFIX.length() + 1);
+    return mapper.readValue(json, AppAttemptId.class);
+  }
+
+  public void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.debug("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleInfo(db);
+    reloadActiveAppAttemptsShufflePartitionInfo(db);
+  }
+
+  @VisibleForTesting
+  public void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        try{
+          AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+          logger.debug("Reloading active application {}_{} merged shuffle 
files paths",
+            appAttemptId.appId, appAttemptId.attemptId);
+          appsShuffleInfo.computeIfAbsent(appAttemptId.appId, id ->
+            new AppShuffleInfo(appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo));
+        } catch (Exception exception) {
+          logger.error("Parsing exception is {}", exception);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void reloadFinalizedAppAttemptsShuffleInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppShufflePartitionId partitionId =
+          parseDbAppAttemptShufflePartitionKeyWithCustomPrefix(
+            key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+        if (partitionId.reduceId != -1) {
+          break;
+        }
+        AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+        if (appShuffleInfo != null) {
+          appShuffleInfo.shuffles.putIfAbsent(partitionId.shuffleId,
+            new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, 
true));

Review comment:
       See comment in `reloadActiveAppAttemptsPathInfo` above.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1172,47 +1634,58 @@ private void finalizePartition() throws IOException {
       // Get rid of any partial block data at the end of the file. This could 
either
       // be due to failure, or a request still being processed when the shuffle
       // merge gets finalized, or any exceptions while updating index/meta 
files.
-      dataChannel.truncate(lastChunkOffset);
+      logger.trace("{}_{} shuffleId {} shuffleMergeId {} reduceId {} "
+         + "truncating files data {} index {} meta {}", 
appShufflePartitionId.appId,
+          appShufflePartitionId.attemptId, appShufflePartitionId.shuffleId,
+          appShufflePartitionId.shuffleMergeId, appShufflePartitionId.reduceId,
+          lastChunkOffset, indexFile.getPos(), metaFile.getPos());
+      dataFile.getChannel().truncate(lastChunkOffset);
       indexFile.getChannel().truncate(indexFile.getPos());
       metaFile.getChannel().truncate(metaFile.getPos());
     }
 
     void closeAllFilesAndDeleteIfNeeded(boolean delete) {
       try {
-        if (dataChannel.isOpen()) {
-          dataChannel.close();
-          if (delete) {
-            dataFile.delete();
-          }
+        dataFile.close();
+        if (delete) {
+          dataFile.delete();

Review comment:
       There is a change in behavior here - we are deleting even if file was 
not open.
   If we are simplifying to only tracking finalized merges, we remove the need 
for this change as well and preserve earlier behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to