mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r895282269


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -342,6 +380,33 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    if (removeFromDb){
+      removeAppShuffleInfoFromDB(appShuffleInfo);
+    }
+  }
+
+  private void removeAppAttemptPathInfoFromDB(AppAttemptId appAttemptId) {
+    if (db != null) {
+      try {
+        db.delete(getDbAppAttemptPathsKey(appAttemptId));
+      } catch (Exception e) {
+        logger.error("Error deleting {} from application paths info in DB", 
appAttemptId, e);
+      }
+    }
+  }
+
+  private void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      appShuffleInfo.shuffles
+        .forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions
+          .forEach((shuffleMergeId, partitionInfo) -> {
+            synchronized (partitionInfo) {
+              removeAppShufflePartitionInfoFromDB(
+                new AppAttemptShuffleMergeId(
+                  appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId));
+            }

Review Comment:
   QQ: Why does this have to be within the `synchronized` block ?
   
   Currently, deletes to `appAttemptShufflePartition` are happening within the 
sync block - while addition is not.
   Any particular reason for delete to be within synch lock ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);
+              }
+            }
+        )
+    );
+  }
+
+  private List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        if (appAttemptId == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        AppPathsInfo appPathsInfo = parseDBAppAttemptPathsValue(e.getValue(), 
appAttemptId);
+        if (appPathsInfo == null) {

Review Comment:
   Same here, how would this happen ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);
+              }
+            }
+        )
+    );
+  }
+
+  private List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        if (appAttemptId == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        AppPathsInfo appPathsInfo = parseDBAppAttemptPathsValue(e.getValue(), 
appAttemptId);
+        if (appPathsInfo == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        appsShuffleInfo.compute(appAttemptId.appId,
+            (appId, existingAppShuffleInfo) -> {
+              if (existingAppShuffleInfo == null ||
+                  existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
+                if (existingAppShuffleInfo != null) {
+                  dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(
+                      new AppAttemptId(
+                          existingAppShuffleInfo.appId, 
existingAppShuffleInfo.attemptId)));
+                }
+                return new AppShuffleInfo(
+                    appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
+              } else {
+                dbKeysToBeRemoved.add(e.getKey());
+                return existingAppShuffleInfo;
+              }
+            });
+      }
+    }
+    return dbKeysToBeRemoved;

Review Comment:
   Assuming my comments above are valid, we would always have 
`dbKeysToBeRemoved` being empty - and does not need to be removed.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -992,6 +1281,42 @@ AppShufflePartitionInfo getPartitionInfo() {
     }
   }
 
+  /**
+   * Simply encodes an application attempt ID.
+   */
+  public static class AppAttemptId {
+    public final String appId;
+    public final int attemptId;
+
+    @JsonCreator
+    public AppAttemptId(
+        @JsonProperty("appId") String appId,
+        @JsonProperty("attemptId") int attemptId) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppAttemptId appAttemptId = (AppAttemptId) o;
+      return Objects.equals(attemptId, appAttemptId.attemptId) &&
+          Objects.equals(appId, appAttemptId.appId);

Review Comment:
   ```suggestion
         return this.attemptId == appAttemptId.attemptId &&
             Objects.equals(appId, appAttemptId.appId);
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
     AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
     if (null != appShuffleInfo) {
-      mergedShuffleCleaner.execute(
-        () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs));
+      submitCleanupTask(
+        () -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, 
true));
     }
+    removeAppAttemptPathInfoFromDB(
+        new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId));

Review Comment:
   We are relying on DB being consistent with `appsShuffleInfo` map.
   Given this, we should move the removal of the DB entry  to be atomic w.r.t 
`appsShuffleInfo.remove`.
   
   Something like:
   ```
   AtomicReference<AppShuffleInfo> ref = new AtomicReference<>(null);
   appsShuffleInfo.compute( appId, (id, info) -> {
       if (null != info) {
         removeAppAttemptPathInfoFromDB(new AppAttemptId(info.appId, 
info.attemptId));
         ref.set(info);
       }
       // always remove
       return null;
     } );
   AppShuffleInfo appShuffleInfo = ref.get
   ```
   



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);
+              }
+            }
+        )
+    );
+  }
+
+  private List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        if (appAttemptId == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        AppPathsInfo appPathsInfo = parseDBAppAttemptPathsValue(e.getValue(), 
appAttemptId);
+        if (appPathsInfo == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        appsShuffleInfo.compute(appAttemptId.appId,
+            (appId, existingAppShuffleInfo) -> {
+              if (existingAppShuffleInfo == null ||
+                  existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {

Review Comment:
   With the changes proposed in the review, we will keep the DB consistent 
w.r.t `appsShuffleInfo` - and so we wont have this case.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -350,15 +415,27 @@ void closeAndDeletePartitionFilesIfNeeded(
    * up older shuffleMergeId partitions. The cleanup will be executed in a 
separate thread.
    */
   @VisibleForTesting
-  void closeAndDeletePartitionFiles(Map<Integer, AppShufflePartitionInfo> 
partitions) {
+  void closeAndDeleteOutdatedPartitions(Map<Integer, AppShufflePartitionInfo> 
partitions) {
     partitions
       .forEach((partitionId, partitionInfo) -> {
         synchronized (partitionInfo) {
           partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+          
removeAppShufflePartitionInfoFromDB(partitionInfo.appAttemptShuffleMergeId);

Review Comment:
   Same as with comment in `removeAppShuffleInfoFromDB`. Does this need to be 
within sync block ?
   
   Also, we would have a large number of partitionInfo for a given shuffle 
merge id - we are repeatedly trying to delete the same shuffle merge id here.
   
   Assuming we dont the delete to be in sync block, we can pull all shuffe 
merge id's in `partitions` and delete them.
   
   For example:
   
   ```
   Set<AppAttemptShuffleMergeId> mergeIdsToRemove = new HashSet<>();
   
   partitions.forEach ... {
     mergeIdsToRemove.add(partitionInfo.appAttemptShuffleMergeId);
     synchronized (partitionInfo) {
       partitionInfo.closeAllFilesAndDeleteIfNeeded(true)
     }
   }
   
   mergeIdsToRemove.forEach(this::removeAppShufflePartitionInfoFromDB);
   ```
   
   
   



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);
+              }
+            }
+        )
+    );
+  }
+
+  private List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        if (appAttemptId == null) {

Review Comment:
   How would this happen ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -643,8 +725,13 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
             AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get();
             logger.warn("Cleanup shuffle info and merged shuffle files for 
{}_{} as new " +
                 "application attempt registered", appId, 
appShuffleInfo.attemptId);
-            mergedShuffleCleaner.execute(
-              () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
true));
+            // Clean up all the merge shuffle related information in the DB 
for the former attempt
+            submitCleanupTask(
+              () -> {
+                removeAppAttemptPathInfoFromDB(new AppAttemptId(appId, 
appShuffleInfo.attemptId));

Review Comment:
   Move this `removeAppAttemptPathInfoFromDB` into `appsShuffleInfo.compute` - 
before `writeAppPathsInfoToDb` (if `appShuffleInfo != null`) - this will keep 
the DB consistent w.r.t `appsShuffleInfo`.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1020,12 +1345,52 @@ public boolean isFinalized() {
     }
   }
 
+  public static class AppAttemptShuffleMergeId {
+    public final String appId;
+    public final int attemptId;
+    public final int shuffleId;
+    public final int shuffleMergeId;
+
+    @JsonCreator
+    public AppAttemptShuffleMergeId(
+        @JsonProperty("appId") String appId,
+        @JsonProperty("attemptId") int attemptId,
+        @JsonProperty("shuffleId") int shuffleId,
+        @JsonProperty("shuffleMergeId") int shuffleMergeId) {
+      Preconditions.checkArgument(appId != null, "app id is null");
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.shuffleId = shuffleId;
+      this.shuffleMergeId = shuffleMergeId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = 
(AppAttemptShuffleMergeId) o;
+      return Objects.equals(attemptId, appAttemptShuffleMergeId.attemptId) &&
+          Objects.equals(shuffleId, appAttemptShuffleMergeId.shuffleId) &&
+          Objects.equals(shuffleMergeId, 
appAttemptShuffleMergeId.shuffleMergeId) &&
+          Objects.equals(appId, appAttemptShuffleMergeId.appId);

Review Comment:
   ```suggestion
         return this.attemptId == appAttemptShuffleMergeId.attemptId &&
             this.shuffleId == appAttemptShuffleMergeId.shuffleId &&
             this.shuffleMergeId == appAttemptShuffleMergeId.shuffleMergeId &&
             Objects.equals(appId, appAttemptShuffleMergeId.appId);
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->

Review Comment:
   I am trying to understand if there are any side effects to doing this 
cleanup async (any subsequent updates which will get removed by this cleanup).
   Note that `mergedShuffleCleaner` is a single threaded executor, so if we 
have a large number of entries to remove/removal takes a while, we can start 
accumulating other cleanup tasks.
   
   Thoughts ?
   +CC @otterc 



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
     AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
     if (null != appShuffleInfo) {
-      mergedShuffleCleaner.execute(
-        () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs));
+      submitCleanupTask(
+        () -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, 
true));
     }
+    removeAppAttemptPathInfoFromDB(
+        new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId));
   }
 
-
   /**
    * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
    * If cleanupLocalDirs is true, the merged shuffle files will also be 
deleted.
    * The cleanup will be executed in a separate thread.
    */
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @VisibleForTesting
-  void closeAndDeletePartitionFilesIfNeeded(
+  void closeAndDeletePartitions(
       AppShuffleInfo appShuffleInfo,
-      boolean cleanupLocalDirs) {
+      boolean cleanupLocalDirs,
+      boolean removeFromDb) {

Review Comment:
   Remove `removeFromDb` ? We are always removing from the DB



##########
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##########
@@ -230,11 +241,14 @@ protected void serviceInit(Configuration externalConf) 
throws Exception {
       // when it comes back
       if (_recoveryPath != null) {
         registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+        mergeManagerFile = 
initRecoveryDb(SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME);
       }
 
-      TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(_conf));
-      MergedShuffleFileManager shuffleMergeManager = 
newMergedShuffleFileManagerInstance(
-        transportConf);
+      TransportConf transportConf = new TransportConf("shuffle",new 
HadoopConfigProvider(_conf));
+      if (shuffleMergeManager == null) {

Review Comment:
   If this is specifically for testing, can you add a comment to that effect ?
   



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1269,13 +1656,30 @@ private AppPathsInfo(
           Arrays.toString(activeLocalDirs),subDirsPerLocalDir, appId);
       }
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppPathsInfo appPathsInfo = (AppPathsInfo) o;
+      return Objects.equals(subDirsPerLocalDir, 
appPathsInfo.subDirsPerLocalDir) &&
+          Arrays.equals(activeLocalDirs, appPathsInfo.activeLocalDirs);

Review Comment:
   ```suggestion
         return this.subDirsPerLocalDir == appPathsInfo.subDirsPerLocalDir &&
             Arrays.equals(activeLocalDirs, appPathsInfo.activeLocalDirs);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to