venkata91 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679534314



##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);
     ByteBuffer[] blocks = new ByteBuffer[]{
-      ByteBuffer.wrap(new byte[4]),
-      ByteBuffer.wrap(new byte[5]),
-      ByteBuffer.wrap(new byte[6]),
-      ByteBuffer.wrap(new byte[7])
+        ByteBuffer.wrap(new byte[4]),
+        ByteBuffer.wrap(new byte[5]),
+        ByteBuffer.wrap(new byte[6]),
+        ByteBuffer.wrap(new byte[7])
     };
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+        new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
     // The onData callback should be called 4 times here before the onComplete 
callback. But a
     // register executor message arrives in shuffle service after the 2nd 
onData callback. The 3rd
     // onData callback should all throw ClosedChannelException as their 
channels are closed.
     stream1.onData(stream1.getID(), blocks[0]);
     stream1.onData(stream1.getID(), blocks[1]);
     Path[] attempt2LocalDirs = createLocalDirs(2);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
-      MERGE_DIRECTORY_META_2);
+        prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+        MERGE_DIRECTORY_META_2);
     closed.acquire();
     // Should throw ClosedChannelException here.
     stream1.onData(stream1.getID(), blocks[3]);
   }
 
+  @Test
+  public void testBlockPushWithOlderShuffleMergeId() throws IOException {
+    StreamCallbackWithID stream1 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    StreamCallbackWithID stream2 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    try {
+      // stream 1 push should be rejected as it is from an older shuffleMergeId
+      stream1.onComplete(stream1.getID());
+    } catch(RuntimeException re) {
+      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
+        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
+          + " pushed", re.getMessage());
+    }
+    // stream 2 now completes
+    stream2.onComplete(stream2.getID());
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 2));
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
+    validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+  }
+
+  @Test
+  public void testFinalizeWithOlderShuffleMergeId() throws IOException {
+    StreamCallbackWithID stream1 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    StreamCallbackWithID stream2 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    try {
+      // stream 1 push should be rejected as it is from an older shuffleMergeId
+      stream1.onComplete(stream1.getID());
+    } catch(RuntimeException re) {
+      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
+        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
+          + " pushed", re.getMessage());
+    }
+    // stream 2 now completes
+    stream2.onComplete(stream2.getID());
+    try {
+      pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 1));
+    } catch(RuntimeException re) {
+      assertEquals("Shuffle merge finalize request for shuffle 0 with 
shuffleMergeId 1 is stale"
+        + " shuffle finalize request as shuffle blocks of a higher 
shuffleMergeId for the shuffle"
+          + " is already being pushed", re.getMessage());
+    }
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 2));
+
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
+    validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+  }
+
+  @Test
+  public void testFinalizeOfDeterminateShuffle() throws IOException {
+    PushBlock[] pushBlocks = new PushBlock[] {
+      new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4])),
+      new PushBlock(0,  0, 1, 0, ByteBuffer.wrap(new byte[5]))
+    };
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
+
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+      pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+    assertTrue("Metadata of determinate shuffle should be removed after 
finalize shuffle"
+      + " merge", appShuffleInfo.getPartitions().get(0) == null);
+    validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
+    validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new 
int[][]{{0}, {1}});

Review comment:
       Yes right, I have updated the same test as well 
`RemoteBlockPushResolverSuite#testBlockReceivedAfterMergeFinalize` checking for 
the new exception message. Do you think we should add a separate test instead 
of making changes there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to