zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r888299473


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfo(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    if (!key.startsWith(prefix + DB_KEY_DELIMITER)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) throws IOException {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        try{
+          AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+          logger.info("Reloading active application {}_{} merged shuffle files 
paths",
+              appAttemptId.appId, appAttemptId.attemptId);
+          appsShuffleInfo.compute(appAttemptId.appId,
+              (appId, existingAppShuffleInfo) -> {
+                if (existingAppShuffleInfo == null ||
+                    existingAppShuffleInfo.attemptId < appAttemptId.attemptId) 
{
+                  return new AppShuffleInfo(
+                      appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
+                } else {
+                  return existingAppShuffleInfo;

Review Comment:
   Added.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +744,156 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfo(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    if (!key.startsWith(prefix + DB_KEY_DELIMITER)) {
+      throw new IllegalArgumentException("expected a string starting with " + 
prefix);
+    }
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) throws IOException {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // we stick a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        try{
+          AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+          logger.info("Reloading active application {}_{} merged shuffle files 
paths",
+              appAttemptId.appId, appAttemptId.attemptId);
+          appsShuffleInfo.compute(appAttemptId.appId,
+              (appId, existingAppShuffleInfo) -> {
+                if (existingAppShuffleInfo == null ||
+                    existingAppShuffleInfo.attemptId < appAttemptId.attemptId) 
{
+                  return new AppShuffleInfo(
+                      appAttemptId.appId, appAttemptId.attemptId, 
appPathsInfo);
+                } else {
+                  return existingAppShuffleInfo;
+                }
+              });
+        } catch (Exception exception) {

Review Comment:
   Refactored on exception handling



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to