mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1063954257


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
       });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+    AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+    int shuffleId = appAttemptShuffleMergeId.shuffleId;
+    int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+    for (int reduceId : reduceIds) {
+      try {
+        File dataFile =
+            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+        dataFile.delete();
+      } catch (Exception e) {

Review Comment:
   Can we fix this @wankunde ?
   Essentially, the changes are:
   
   a) There is no exception thrown in this block - we dont need the try/catch.
   b) When the delete fails, we dont need to do the `warn` (here and below) : 
this can happen if application exit is racing against remove shuffle - the 
`info` at the end of the method will suffice.



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -1316,6 +1318,72 @@ public void 
testJsonSerializationOfPushShufflePartitionInfo() throws IOException
       RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
   }
 
+  @Test
+  public void testRemoveShuffleMerge() throws IOException, 
InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    String testApp = "testRemoveShuffleMerge";
+    RemoteBlockPushResolver pushResolver = new RemoteBlockPushResolver(conf, 
null) {
+      @Override
+      void closeAndDeleteOutdatedPartitions(
+          AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+          Map<Integer, AppShufflePartitionInfo> partitions) {
+        super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
partitions);
+        closed.release();
+      }
+
+      @Override
+      void deleteMergedFiles(
+          AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+          AppShuffleInfo appShuffleInfo,
+          int[] reduceIds,
+          boolean deleteFromDB) {
+        super.deleteMergedFiles(appAttemptShuffleMergeId, appShuffleInfo, 
reduceIds, deleteFromDB);
+        closed.release();
+      }
+    };
+    pushResolver.registerExecutor(testApp, new ExecutorShuffleInfo(
+        prepareLocalDirs(localDirs, MERGE_DIRECTORY), 1, 
MERGE_DIRECTORY_META));
+
+    // 1. Check whether the data is cleaned up when merged shuffle is finalized
+    RemoteBlockPushResolver.AppShuffleInfo shuffleInfo =
+        pushResolver.validateAndGetAppShuffleInfo(testApp);
+    StreamCallbackWithID streamCallback1 = 
pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+    streamCallback1.onData(streamCallback1.getID(), ByteBuffer.wrap(new 
byte[2]));
+    streamCallback1.onComplete(streamCallback1.getID());
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0, 1));
+    assertTrue(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+    assertTrue(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 
0)).exists());
+    assertTrue(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+    pushResolver.removeShuffleMerge(
+        new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 0, 1));
+    closed.tryAcquire(10, TimeUnit.SECONDS);
+    assertFalse(shuffleInfo.getMergedShuffleMetaFile(0, 1, 0).exists());
+    assertFalse(new File(shuffleInfo.getMergedShuffleIndexFilePath(0, 1, 
0)).exists());
+    assertFalse(shuffleInfo.getMergedShuffleDataFile(0, 1, 0).exists());
+
+    // 2. Check whether the data is cleaned up when merged shuffle is not 
finalized.
+    StreamCallbackWithID streamCallback2 = 
pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, NO_ATTEMPT_ID, 2, 1, 0, 0, 0));
+    streamCallback2.onData(streamCallback2.getID(), ByteBuffer.wrap(new 
byte[2]));
+    streamCallback2.onComplete(streamCallback2.getID());
+    assertTrue(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
+    pushResolver.removeShuffleMerge(
+        new RemoveShuffleMerge(testApp, NO_ATTEMPT_ID, 2, 1));
+    closed.tryAcquire(10, TimeUnit.SECONDS);
+    assertFalse(shuffleInfo.getMergedShuffleMetaFile(2, 1, 0).exists());
+
+    // 3. Check whether the data is cleaned up when higher shuffleMergeId 
finalize request comes
+    StreamCallbackWithID streamCallback3 = 
pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, NO_ATTEMPT_ID, 3, 1, 0, 0, 0));
+    streamCallback3.onData(streamCallback3.getID(), ByteBuffer.wrap(new 
byte[2]));
+    streamCallback3.onComplete(streamCallback3.getID());
+    assertTrue(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 3, 2));
+    closed.tryAcquire(10, TimeUnit.SECONDS);
+    assertFalse(shuffleInfo.getMergedShuffleMetaFile(3, 1, 0).exists());

Review Comment:
   Can we also add tests for the additional cases we have ?
   
   * `removeShuffleMerge` for `DELETE_ALL_MERGED_SHUFFLE` - we can just do what 
we have for `streamCallback1` (Case 1) above, but use 
`DELETE_ALL_MERGED_SHUFFLE` for merge id.
   * additional cases here to make sure of the negative cases as well ? namely:
     * `msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId`
     * delete for a merge id which does not exist.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to