zhouyejoe commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r668468557
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -219,101 +238,145 @@ public void
testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
stream1.onComplete(stream1.getID());
// stream 2 now completes completes
stream2.onComplete(stream2.getID());
- pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
}
@Test
public void testFailureAfterData() throws IOException {
StreamCallbackWithID stream =
- pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0,
0, 0, 0));
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
- pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
}
@Test
public void testFailureAfterMultipleDataBlocks() throws IOException {
StreamCallbackWithID stream =
- pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0,
0, 0, 0));
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
- pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
}
@Test
public void testFailureAfterComplete() throws IOException {
StreamCallbackWithID stream =
- pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0,
0, 0, 0));
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onComplete(stream.getID());
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
- pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
}
@Test (expected = RuntimeException.class)
- public void testTooLateArrival() throws IOException {
+ public void testBlockReceivedAfterMergeFinalize() throws IOException {
ByteBuffer[] blocks = new ByteBuffer[]{
ByteBuffer.wrap(new byte[4]),
ByteBuffer.wrap(new byte[5])
};
StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
for (ByteBuffer block : blocks) {
stream.onData(stream.getID(), block);
}
stream.onComplete(stream.getID());
- pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0));
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
try {
stream1.onComplete(stream1.getID());
} catch (RuntimeException re) {
assertEquals(
- "Block shufflePush_0_1_0 received after merged shuffle is finalized",
- re.getMessage());
+ "Block shufflePush_0_1_0 received after merged shuffle is finalized",
re.getMessage());
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0);
validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new
int[][]{{0}});
throw re;
}
}
+ @Test (expected = RuntimeException.class)
+ public void testBlockReceivedAfterNewAttemptRegistered() throws IOException {
+ String testApp = "updateLocalDirsTwiceWithTwoAttempts";
+ Path[] attempt1LocalDirs = createLocalDirs(1);
+ registerExecutor(testApp,
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
+ ByteBuffer[] blocks = new ByteBuffer[]{
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5])
+ };
+ StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+ for (ByteBuffer block : blocks) {
+ stream1.onData(stream1.getID(), block);
+ }
+ stream1.onComplete(stream1.getID());
+ Path[] attempt2LocalDirs = createLocalDirs(2);
+ registerExecutor(testApp,
+ prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
+ MERGE_DIRECTORY_META_2);
+ StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+ for (ByteBuffer block : blocks) {
+ stream2.onData(stream2.getID(), block);
+ }
+ stream2.onComplete(stream2.getID());
+ StreamCallbackWithID stream3 = pushResolver.receiveBlockDataAsStream(
Review comment:
Updated the unit test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]