mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r918226879


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    removeAppShuffleInfoFromDB(appShuffleInfo);
+  }
+
+  /**
+   * Remove the application attempt local paths information from the DB.
+   */
+  private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) 
throws Exception{

Review Comment:
   Can you add a comment to both `removeAppAttemptPathInfoFromDB` and 
`writeNewAppAttemptPathInfoToDBAndRemoveOutdated` that they are expected to be 
invoked with the `appsShuffleInfo` lock help for `appId` ?
   (Also, group them together in the source file)



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +768,232 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Remove the former application attempt local paths information from the DB 
and insert the
+   * local paths information from the newer application attempt. If the 
deletion fails, the
+   * insertion will also be skipped. This ensures that there will always be a 
single application
+   * attempt local path information in the DB.
+   */
+  private void writeNewAppAttemptPathInfoToDBAndRemoveOutdated(
+      String appId,
+      int newAttemptId,
+      AppShuffleInfo appShuffleInfo,
+      AppPathsInfo appPathsInfo) {
+    try{
+      if (appShuffleInfo != null) {
+        removeAppAttemptPathInfoFromDB(appId, appShuffleInfo.attemptId);
+      }
+      writeAppPathsInfoToDb(appId, newAttemptId, appPathsInfo);

Review Comment:
   Thoughts on making the exception handling local to the DB invocation (since 
we dont need to handle cross DB invocation failures for now) ? We are already 
doing this for `removeAppShufflePartitionInfoFromDB`, 
`writeAppAttemptShuffleMergeInfoToDB`, etc.
   
   Given this, let us make the Exception handling local to 
`removeAppAttemptPathInfoFromDB` (so it does not throw an `Exception`)



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -632,6 +736,12 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
           appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
             if (appShuffleInfo == null || attemptId > 
appShuffleInfo.attemptId) {
               originalAppShuffleInfo.set(appShuffleInfo);
+              AppPathsInfo appPathsInfo = new AppPathsInfo(appId, 
executorInfo.localDirs,
+                  mergeDir, executorInfo.subDirsPerLocalDir);
+              // Clean up the outdated App Attempt local path info in the DB 
and
+              // put the newly registered local path info from newer attempt 
into the DB.
+              writeNewAppAttemptPathInfoToDBAndRemoveOutdated(

Review Comment:
   nit: Do we need this method ? (the comment is helpful, just the method)
   We can simply do 
   ```
   if (null != appShuffleInfo) removeAppAttemptPathInfoFromDB()
   writeAppPathsInfoToDb()
   ```



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    removeAppShuffleInfoFromDB(appShuffleInfo);
+  }
+
+  /**
+   * Remove the application attempt local paths information from the DB.
+   */
+  private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) 
throws Exception{
+    AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+    if (db != null) {
+      db.delete(getDbAppAttemptPathsKey(appAttemptId));

Review Comment:
   Given the possibility of stale entries from previous failed deletes hanging 
around, can we scan for all entries for an application id and delete them here ?
   
   Note that this will require changes to how we encode db key for 
`AppAttemptId` - we cannot use json for it, since we want to do a prefix scan.
   Something like:
   ```
     private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
       return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + 
appAttemptId.appId + DB_KEY_DELIMITER + 
appAttemptId.attemptId).getBytes(StandardCharsets.UTF_8);
     }
   
     private byte[] getDbAppAttemptPathsKeyPrefix(String appId) throws 
IOException {
       return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + 
appAttemptId.appId + DB_KEY_DELIMITER).getBytes(StandardCharsets.UTF_8);
     }
   
     private AppAttemptId parseDbAppAttemptPathsKey(String value) throws 
IOException {
       String[] parts = key.split(DB_KEY_DELIMITER);
       if (parts.length != 3 || !APP_ATTEMPT_PATH_KEY_PREFIX.equals(parts[0])) 
throw IllegalArgumentException("Unable to parse '" + value + "' as a 
AppAttemptId");
       String appId = parts[1];
       try {
         int attemptId = Integer.parseInt(parts[2]);
         return new AppAttemptId(appId, attemptId);
       } catch (NumberFormatException nfEx) {
         throw IllegalArgumentException("Unable to parse '" + value + "' as a 
AppAttemptId", nfEx);
       }
     }
   ```
   
   Thoughts ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +768,232 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Remove the former application attempt local paths information from the DB 
and insert the
+   * local paths information from the newer application attempt. If the 
deletion fails, the
+   * insertion will also be skipped. This ensures that there will always be a 
single application
+   * attempt local path information in the DB.
+   */
+  private void writeNewAppAttemptPathInfoToDBAndRemoveOutdated(
+      String appId,
+      int newAttemptId,
+      AppShuffleInfo appShuffleInfo,
+      AppPathsInfo appPathsInfo) {
+    try{
+      if (appShuffleInfo != null) {
+        removeAppAttemptPathInfoFromDB(appId, appShuffleInfo.attemptId);
+      }
+      writeAppPathsInfoToDb(appId, newAttemptId, appPathsInfo);
+    } catch (Exception e) {
+      logger.error("Error deleting {} from application paths info in DB", 
appId, e);
+    }
+  }
+
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info for {}", 
appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition {}", 
appAttemptShuffleMergeId, e);
+      }
+    }
+
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // We add a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), 
AppPathsInfo.class);
+        appsShuffleInfo.put(appAttemptId.appId, new AppShuffleInfo(
+            appAttemptId.appId, appAttemptId.attemptId, appPathsInfo));
+      }

Review Comment:
   Given the possibility that `delete` could have missed removing an earlier 
attempt id, let us do:
   
   ```
           appsShuffleInfo.compute(appAttemptId.appId, (id, existing) -> {
             if (null == existing || existing.attemptId < 
appAttemptId.attemptId) {
               // No entry present, or existing entry is older
               return new AppShuffleInfo(
                       appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
             } else {
               // existing entry is for a newer attempt
               return existing;
             }
           });
   ```
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to