venkata91 commented on a change in pull request #33034: URL: https://github.com/apache/spark/pull/33034#discussion_r679534314
########## File path: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java ########## @@ -1022,55 +1019,240 @@ public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted() @Test(expected = ClosedChannelException.class) public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted() - throws IOException, InterruptedException { + throws IOException, InterruptedException { Semaphore closed = new Semaphore(0); pushResolver = new RemoteBlockPushResolver(conf) { @Override void closeAndDeletePartitionFilesIfNeeded( - AppShuffleInfo appShuffleInfo, - boolean cleanupLocalDirs) { + AppShuffleInfo appShuffleInfo, + boolean cleanupLocalDirs) { super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs); closed.release(); } }; String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted"; Path[] attempt1LocalDirs = createLocalDirs(1); registerExecutor(testApp, - prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), - MERGE_DIRECTORY_META_1); + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); ByteBuffer[] blocks = new ByteBuffer[]{ - ByteBuffer.wrap(new byte[4]), - ByteBuffer.wrap(new byte[5]), - ByteBuffer.wrap(new byte[6]), - ByteBuffer.wrap(new byte[7]) + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]), + ByteBuffer.wrap(new byte[6]), + ByteBuffer.wrap(new byte[7]) }; StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0)); // The onData callback should be called 4 times here before the onComplete callback. But a // register executor message arrives in shuffle service after the 2nd onData callback. The 3rd // onData callback should all throw ClosedChannelException as their channels are closed. stream1.onData(stream1.getID(), blocks[0]); stream1.onData(stream1.getID(), blocks[1]); Path[] attempt2LocalDirs = createLocalDirs(2); registerExecutor(testApp, - prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), - MERGE_DIRECTORY_META_2); + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); closed.acquire(); // Should throw ClosedChannelException here. stream1.onData(stream1.getID(), blocks[3]); } + @Test + public void testBlockPushWithOlderShuffleMergeId() throws IOException { + StreamCallbackWithID stream1 = + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + StreamCallbackWithID stream2 = + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0)); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + try { + // stream 1 push should be rejected as it is from an older shuffleMergeId + stream1.onComplete(stream1.getID()); + } catch(RuntimeException re) { + assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or" + + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" + + " pushed", re.getMessage()); + } + // stream 2 now completes + stream2.onComplete(stream2.getID()); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 2)); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 2, 0); + validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new int[][]{{0}}); + } + + @Test + public void testFinalizeWithOlderShuffleMergeId() throws IOException { + StreamCallbackWithID stream1 = + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0)); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + StreamCallbackWithID stream2 = + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0)); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); + stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2])); + stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2])); + try { + // stream 1 push should be rejected as it is from an older shuffleMergeId + stream1.onComplete(stream1.getID()); + } catch(RuntimeException re) { + assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle is finalized or" + + " stale block push as shuffle blocks of a higher shuffleMergeId for the shuffle is being" + + " pushed", re.getMessage()); + } + // stream 2 now completes + stream2.onComplete(stream2.getID()); + try { + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 1)); + } catch(RuntimeException re) { + assertEquals("Shuffle merge finalize request for shuffle 0 with shuffleMergeId 1 is stale" + + " shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the shuffle" + + " is already being pushed", re.getMessage()); + } + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 2)); + + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 2, 0); + validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new int[][]{{0}}); + } + + @Test + public void testFinalizeOfDeterminateShuffle() throws IOException { + PushBlock[] pushBlocks = new PushBlock[] { + new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4])), + new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[5])) + }; + pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks); + MergeStatuses statuses = pushResolver.finalizeShuffleMerge( + new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); + + RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo = + pushResolver.validateAndGetAppShuffleInfo(TEST_APP); + assertTrue("Metadata of determinate shuffle should be removed after finalize shuffle" + + " merge", appShuffleInfo.getPartitions().get(0) == null); + validateMergeStatuses(statuses, new int[] {0}, new long[] {9}); + MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}}); Review comment: Yes right, I have updated the same test as well `RemoteBlockPushResolverSuite#testBlockReceivedAfterMergeFinalize` checking for the new exception message. Do you think we should add a separate test instead of making changes there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org