Ngone51 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679656876
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -398,8 +486,8 @@ public void onFailure(String streamId, Throwable cause) {
@Override
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws
IOException {
Review comment:
nit: `IOException` is never thrown
##########
File path:
common/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
##########
@@ -88,6 +98,7 @@ public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("requestId", requestId)
.append("appId", appId)
+ .append("shuffleMergeId", shuffleMergeId)
Review comment:
nit: append after `shuffleId`? (to keep consistent with other methods
above)
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
- ByteBuffer.wrap(new byte[4]),
- ByteBuffer.wrap(new byte[5]),
- ByteBuffer.wrap(new byte[6]),
- ByteBuffer.wrap(new byte[7])
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5]),
+ ByteBuffer.wrap(new byte[6]),
+ ByteBuffer.wrap(new byte[7])
};
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+ new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
Review comment:
revert indent?
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
Review comment:
revert indent?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1065,7 +1232,12 @@ private AppPathsInfo(
private final String appId;
private final int attemptId;
private final AppPathsInfo appPathsInfo;
- private final ConcurrentMap<Integer, Map<Integer,
AppShufflePartitionInfo>> partitions;
+ /**
+ * 1. Key tracks shuffleId for an application
+ * 2. Value tracks the AppShuffleMergePartitionsInfo having shuffleMergeId
and
+ * a Map tracking AppShufflePartitionInfo for all the shuffle partitions.
+ */
+ private final ConcurrentMap<Integer, AppShuffleMergePartitionsInfo>
partitions;
Review comment:
Shall we rename this to `shuffles`? I think the hierarchy here is `app
-> shuffle ->partitions`.
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
- ByteBuffer.wrap(new byte[4]),
- ByteBuffer.wrap(new byte[5]),
- ByteBuffer.wrap(new byte[6]),
- ByteBuffer.wrap(new byte[7])
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5]),
+ ByteBuffer.wrap(new byte[6]),
+ ByteBuffer.wrap(new byte[7])
};
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+ new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
// The onData callback should be called 4 times here before the onComplete
callback. But a
// register executor message arrives in shuffle service after the 2nd
onData callback. The 3rd
// onData callback should all throw ClosedChannelException as their
channels are closed.
stream1.onData(stream1.getID(), blocks[0]);
stream1.onData(stream1.getID(), blocks[1]);
Path[] attempt2LocalDirs = createLocalDirs(2);
registerExecutor(testApp,
- prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
- MERGE_DIRECTORY_META_2);
+ prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
+ MERGE_DIRECTORY_META_2);
closed.acquire();
// Should throw ClosedChannelException here.
stream1.onData(stream1.getID(), blocks[3]);
}
+ @Test
+ public void testBlockPushWithOlderShuffleMergeId() throws IOException {
+ StreamCallbackWithID stream1 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ StreamCallbackWithID stream2 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ try {
+ // stream 1 push should be rejected as it is from an older shuffleMergeId
+ stream1.onComplete(stream1.getID());
+ } catch(RuntimeException re) {
+ assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle
is finalized or"
+ + " stale block push as shuffle blocks of a higher shuffleMergeId for
the shuffle is being"
+ + " pushed", re.getMessage());
+ }
+ // stream 2 now completes
+ stream2.onComplete(stream2.getID());
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 2));
+ MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0,
2, 0);
+ validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new
int[][]{{0}});
+ }
+
+ @Test
+ public void testFinalizeWithOlderShuffleMergeId() throws IOException {
+ StreamCallbackWithID stream1 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ StreamCallbackWithID stream2 =
+ pushResolver.receiveBlockDataAsStream(
+ new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+ stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+ try {
+ // stream 1 push should be rejected as it is from an older shuffleMergeId
+ stream1.onComplete(stream1.getID());
+ } catch(RuntimeException re) {
+ assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle
is finalized or"
+ + " stale block push as shuffle blocks of a higher shuffleMergeId for
the shuffle is being"
+ + " pushed", re.getMessage());
+ }
+ // stream 2 now completes
+ stream2.onComplete(stream2.getID());
+ try {
+ pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP,
NO_ATTEMPT_ID, 0, 1));
+ } catch(RuntimeException re) {
Review comment:
This doesn't really throw the exception. Could you check it?
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
- ByteBuffer.wrap(new byte[4]),
- ByteBuffer.wrap(new byte[5]),
- ByteBuffer.wrap(new byte[6]),
- ByteBuffer.wrap(new byte[7])
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5]),
+ ByteBuffer.wrap(new byte[6]),
+ ByteBuffer.wrap(new byte[7])
};
StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
- new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+ new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
// The onData callback should be called 4 times here before the onComplete
callback. But a
// register executor message arrives in shuffle service after the 2nd
onData callback. The 3rd
// onData callback should all throw ClosedChannelException as their
channels are closed.
stream1.onData(stream1.getID(), blocks[0]);
stream1.onData(stream1.getID(), blocks[1]);
Path[] attempt2LocalDirs = createLocalDirs(2);
registerExecutor(testApp,
- prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
- MERGE_DIRECTORY_META_2);
+ prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_2),
+ MERGE_DIRECTORY_META_2);
Review comment:
revert?
##########
File path:
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
@Test(expected = ClosedChannelException.class)
public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
pushResolver = new RemoteBlockPushResolver(conf) {
@Override
void closeAndDeletePartitionFilesIfNeeded(
- AppShuffleInfo appShuffleInfo,
- boolean cleanupLocalDirs) {
+ AppShuffleInfo appShuffleInfo,
+ boolean cleanupLocalDirs) {
super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo,
cleanupLocalDirs);
closed.release();
}
};
String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
Path[] attempt1LocalDirs = createLocalDirs(1);
registerExecutor(testApp,
- prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
- MERGE_DIRECTORY_META_1);
+ prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" +
ATTEMPT_ID_1),
+ MERGE_DIRECTORY_META_1);
ByteBuffer[] blocks = new ByteBuffer[]{
- ByteBuffer.wrap(new byte[4]),
- ByteBuffer.wrap(new byte[5]),
- ByteBuffer.wrap(new byte[6]),
- ByteBuffer.wrap(new byte[7])
+ ByteBuffer.wrap(new byte[4]),
+ ByteBuffer.wrap(new byte[5]),
+ ByteBuffer.wrap(new byte[6]),
+ ByteBuffer.wrap(new byte[7])
Review comment:
revert?
##########
File path:
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -407,27 +408,65 @@ protected Ratio getRatio() {
return rddAndSplitIds;
}
+ /**
+ * @param blockIds Regular shuffle blockIds starts with SHUFFLE_BLOCK_ID
to be parsed
+ * @param shuffleId shuffle blocks shuffleId
+ * @return mapId and reduceIds of the shuffle blocks in the same order as
that of the blockIds
+ *
+ * Regular shuffle blocks format should be blockIdParts[1] = shuffleId,
+ * blockIdParts[2] = mapId, blockIdParts[3] = reduceId
+ */
private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
- // For regular shuffle blocks, primaryId is mapId and secondaryIds are
reduceIds.
- // For shuffle chunks, primaryIds is reduceId and secondaryIds are
chunkIds.
- final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+ final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
for (int i = 0; i < blockIds.length; i++) {
String[] blockIdParts = blockIds[i].split("_");
- if (blockIdParts.length != 4
- || (!requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
- || (requestForMergedBlockChunks &&
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+ if (blockIdParts.length != 4 ||
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
throw new IllegalArgumentException("Unexpected shuffle block id
format: " + blockIds[i]);
}
if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
throw new IllegalArgumentException("Expected shuffleId=" + shuffleId
+
", got:" + blockIds[i]);
}
- // For regular blocks, blockIdParts[2] is mapId. For chunks, it is
reduceId.
- primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
- // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is
chunkId.
- primaryIdAndSecondaryIds[2 * i + 1] =
Integer.parseInt(blockIdParts[3]);
+ // mapId
+ mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+ // reduceId
+ mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
}
- return primaryIdAndSecondaryIds;
+ return mapIdAndReduceIds;
+ }
+
+ /**
+ * @param blockIds Shuffle merged chunks starts with SHUFFLE_CHUNK_ID to
be parsed
+ * @param shuffleId shuffle blocks shuffleId
+ * @param shuffleMergeId shuffleMergeId is used to uniquely identify
merging process
+ * of shuffle by an indeterminate stage attempt.
+ * @return reduceId and chunkIds of the shuffle chunks in the same order
as that of the
+ * blockIds
+ *
+ * Shuffle merged chunks format should be blockIdParts[1] = shuffleId,
+ * blockIdParts[2] = shuffleMergeId, blockIdParts[3] = reduceId,
blockIdParts[4] = chunkId
Review comment:
Actually, what I expect is something like
"shuffleChunk_shuffleId_shuffleMergeId_reduceId_chunkId"..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]