otterc commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r914055427


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -621,9 +626,10 @@ public void 
testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
-    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo 
=callback1.getPartitionInfo();

Review Comment:
   Nit: No space after `=` .



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -585,9 +588,11 @@ public void testRecoverMetaFileAfterIOExceptions() throws 
IOException {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
-    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo =

Review Comment:
   What changed here? Just formatting?



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -685,7 +691,8 @@ public void 
testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOE
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
     callback.onComplete(callback.getID());
-    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
+    TestMergeShuffleFile testIndexFile =
+      (TestMergeShuffleFile) partitionInfo.getIndexFile();

Review Comment:
   Same here. Seems just like a format change



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -886,12 +896,14 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
   public void testPushBlockFromPreviousAttemptIsRejected()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFilesIfNeeded(
+      void closeAndDeletePartitions(
         AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
-        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        boolean cleanupLocalDirs,
+        boolean removeFromDb) {

Review Comment:
   Nit: indentation should be 4 spaces



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -316,22 +355,37 @@ public String[] getMergedBlockDirs(String appId) {
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    // Cleanup the DB within critical section to gain the consistency between
+    // DB and in-memory hashmap.
+    AtomicReference<AppShuffleInfo> ref = new AtomicReference<>(null);
+    appsShuffleInfo.compute(appId, (id, info) -> {
+      if (null != info) {
+        try{

Review Comment:
   Nit: no space after `try`



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -758,7 +766,8 @@ public void 
testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IO
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
-    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
+    TestMergeShuffleFile testIndexFile =
+      (TestMergeShuffleFile) partitionInfo.getIndexFile();

Review Comment:
   Same here. Can the format changes be avoided?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -343,15 +399,52 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    removeAppShuffleInfoFromDB(appShuffleInfo);
+  }
+
+  /**
+   * Remove the application attempt local paths information from the DB. This 
method is being
+   * invoked within the lock from the ConcurrentHashmap appsShuffleInfo on the 
specific
+   * applicationId.
+   */
+  @VisibleForTesting
+  void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
+    AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+    if (db != null) {
+      try{
+        db.delete(getDbAppAttemptPathsKey(appAttemptId));
+      } catch (Exception e) {
+        logger.error("Failed to remove the application attempt {} local path 
in DB",
+            appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Remove the finalized shuffle partitions information for an application 
attempt from the DB
+   */
+  @VisibleForTesting
+  void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) ->
+          removeAppShufflePartitionInfoFromDB(
+              new AppAttemptShuffleMergeId(
+                  appShuffleInfo.appId, appShuffleInfo.attemptId,
+                  shuffleId, shuffleInfo.shuffleMergeId)));
+    }
   }
 
   /**
-   * Clean up all the AppShufflePartitionInfo for a specific shuffleMergeId. 
This is done
-   * since there is a higher shuffleMergeId request made for a shuffleId, 
therefore clean
-   * up older shuffleMergeId partitions. The cleanup will be executed in a 
separate thread.
+   * Clean up all the AppShufflePartitionInfo and the finalized shuffle 
partitions in DB for
+   * a specific shuffleMergeId. This is done since there is a higher 
shuffleMergeId request made
+   * for a shuffleId, therefore clean up older shuffleMergeId partitions. The 
cleanup will be
+   * executed in a separate thread.

Review Comment:
   Nit: clean up will be executed by the `mergedShuffleCleaner` thread



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->

Review Comment:
   That's a good point. If the async deletion takes longer and there is a newer 
app attempt, then we will be trying to delete the outdated entries multiple 
times. It will pollute the logs with a lot of unnecessary errors. Maybe at 
startup we should delete this in-sync.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -316,22 +355,39 @@ public String[] getMergedBlockDirs(String appId) {
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    // Cleanup the DB within critical section to gain the consistency between
+    // DB and in-memory hashmap.
+    AtomicReference<AppShuffleInfo> ref = new AtomicReference<>(null);
+    appsShuffleInfo.compute(appId, (id, info) -> {
+      if (null != info) {
+        // Try cleaning up this application attempt local paths information
+        // and also the local paths information from former attempts in DB.
+        removeAppAttemptPathInfoFromDB(info.appId, info.attemptId);
+        if (info.attemptId != UNDEFINED_ATTEMPT_ID) {
+          for (int formerAttemptId = info.attemptId - 1; formerAttemptId >= 0; 
formerAttemptId--) {

Review Comment:
   Is there a possibility that the former attempt entries don't exist in the 
db? If yes, then should it be better to check the db if the entry exists and 
then delete it? When the entry doesn't exist, then logging it as `Error` seems 
misleading



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -180,12 +215,15 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
             // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
             // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
             // happens in the indeterminate stage retries
-            logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a 
new shuffle " +
-                "merge metadata since received shuffleMergeId is higher than 
latest " +
-                "shuffleMergeId {}", appShuffleInfo.appId, 
appShuffleInfo.attemptId, shuffleId,
-                shuffleMergeId, latestShuffleMergeId);
-            mergedShuffleCleaner.execute(() ->
-                
closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
+            AppAttemptShuffleMergeId appAttemptShuffleMergeId =
+              new AppAttemptShuffleMergeId(
+                appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId);

Review Comment:
   Nit: indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to