mridulm commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679523972
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
- ByteBuffer.wrap(new byte[4]),
- ByteBuffer.wrap(new byte[5]),
- ByteBuffer.wrap(new byte[6]),
- ByteBuffer.wrap(new byte[7])
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5]),
+ ByteBuffer.wrap(new byte[6]),
+ ByteBuffer.wrap(new byte[7])
};
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+ new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
// The onData callback should be called 4 times here before the onComplete
callback. But a
// register executor message arrives in shuffle service after the 2nd
onData callback. The 3rd
// onData callback should all throw ClosedChannelException as their
channels are closed.
stream1.onData(stream1.getID(), blocks[0]);
stream1.onData(stream1.getID(), blocks[1]);
Path[] attempt2LocalDirs = createLocalDirs(2);
registerExecutor(testApp,
- prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
- MERGE_DIRECTORY_META_2);
+ prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
+ MERGE_DIRECTORY_META_2);
closed.acquire();
// Should throw ClosedChannelException here.
stream1.onData(stream1.getID(), blocks[3]);
}
+ @Test
+ public void testBlockPushWithOlderShuffleMergeId() throws IOException {
+ StreamCallbackWithID stream1 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ StreamCallbackWithID stream2 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ try {
+ // stream 1 push should be rejected as it is from an older shuffleMergeId
+ stream1.onComplete(stream1.getID());
+ } catch(RuntimeException re) {
+ assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle
is finalized or"
+ + " stale block push as shuffle blocks of a higher shuffleMergeId for
the shuffle is being"
+ + " pushed", re.getMessage());
+ }
+ // stream 2 now completes
+ stream2.onComplete(stream2.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 2));
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
+ validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ }
+
+ @Test
+ public void testFinalizeWithOlderShuffleMergeId() throws IOException {
+ StreamCallbackWithID stream1 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ StreamCallbackWithID stream2 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ try {
+ // stream 1 push should be rejected as it is from an older shuffleMergeId
+ stream1.onComplete(stream1.getID());
+ } catch(RuntimeException re) {
+ assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle
is finalized or"
+ + " stale block push as shuffle blocks of a higher shuffleMergeId for
the shuffle is being"
+ + " pushed", re.getMessage());
+ }
+ // stream 2 now completes
+ stream2.onComplete(stream2.getID());
+ try {
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 1));
+ } catch(RuntimeException re) {
+ assertEquals("Shuffle merge finalize request for shuffle 0 with
shuffleMergeId 1 is stale"
+ + " shuffle finalize request as shuffle blocks of a higher
shuffleMergeId for the shuffle"
+ + " is already being pushed", re.getMessage());
+ }
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 2));
+
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
+ validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ }
+
+ @Test
+ public void testFinalizeOfDeterminateShuffle() throws IOException {
+ PushBlock[] pushBlocks = new PushBlock[] {
+ new PushBlock(0, 0, 0, 0, ByteBuffer.wrap(new byte[4])),
+ new PushBlock(0, 0, 1, 0, ByteBuffer.wrap(new byte[5]))
+ };
+ pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
+ MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+ new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
+
+ RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+ pushResolver.validateAndGetAppShuffleInfo(TEST_APP);
+ assertTrue("Metadata of determinate shuffle should be removed after
finalize shuffle"
+ + " merge", appShuffleInfo.getPartitions().get(0) == null);
+ validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
0, 0);
+ validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{4, 5}, new
int[][]{{0}, {1}});
Review comment:
Do we have a test which checks that push to a determinate shuffle after
finalize is rejected ?
I think there is one outside of the context of this PR - pls check and add
if not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]