Ngone51 commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r663719472
########## File path: common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java ########## @@ -219,101 +238,145 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE stream1.onComplete(stream1.getID()); // stream 2 now completes completes stream2.onComplete(stream2.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}}); } @Test public void testFailureAfterData() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @Test public void testFailureAfterMultipleDataBlocks() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); assertEquals("num-chunks", 0, blockMeta.getNumChunks()); } @Test public void testFailureAfterComplete() throws IOException { StreamCallbackWithID stream = - pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + pushResolver.receiveBlockDataAsStream( + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3])); stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onComplete(stream.getID()); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); } @Test (expected = RuntimeException.class) - public void testTooLateArrival() throws IOException { + public void testBlockReceivedAfterMergeFinalize() throws IOException { ByteBuffer[] blocks = new ByteBuffer[]{ ByteBuffer.wrap(new byte[4]), ByteBuffer.wrap(new byte[5]) }; StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 0, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0)); for (ByteBuffer block : blocks) { stream.onData(stream.getID(), block); } stream.onComplete(stream.getID()); - pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0)); + pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0)); StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( - new PushBlockStream(TEST_APP, 0, 1, 0, 0)); + new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0)); stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4])); try { stream1.onComplete(stream1.getID()); } catch (RuntimeException re) { assertEquals( - "Block shufflePush_0_1_0 received after merged shuffle is finalized", - re.getMessage()); + "Block shufflePush_0_1_0 received after merged shuffle is finalized", re.getMessage()); MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0); validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}}); throw re; } } + @Test (expected = RuntimeException.class) + public void testBlockReceivedAfterNewAttemptRegistered() throws IOException { + String testApp = "updateLocalDirsTwiceWithTwoAttempts"; + Path[] attempt1LocalDirs = createLocalDirs(1); + registerExecutor(testApp, + prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1), + MERGE_DIRECTORY_META_1); + ByteBuffer[] blocks = new ByteBuffer[]{ + ByteBuffer.wrap(new byte[4]), + ByteBuffer.wrap(new byte[5]) + }; + StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 1, 0, 0, 0, 0)); + for (ByteBuffer block : blocks) { + stream1.onData(stream1.getID(), block); + } + stream1.onComplete(stream1.getID()); + Path[] attempt2LocalDirs = createLocalDirs(2); + registerExecutor(testApp, + prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2), + MERGE_DIRECTORY_META_2); + StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream( + new PushBlockStream(testApp, 2, 0, 1, 0, 0)); + for (ByteBuffer block : blocks) { + stream2.onData(stream2.getID(), block); + } + stream2.onComplete(stream2.getID()); + StreamCallbackWithID stream3 = pushResolver.receiveBlockDataAsStream( Review comment: I have run this test. It turns out that the `RuntimeException` is actually thrown here due to: "The attempt id 1 in this PushBlockStream message does not match with the current attempt id 2 stored in shuffle service for application updateLocalDirsTwiceWithTwoAttempts" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org