Ngone51 commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679656876



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -398,8 +486,8 @@ public void onFailure(String streamId, Throwable cause) {
 
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {

Review comment:
       nit: `IOException` is never thrown

##########
File path: 
common/network-common/src/main/java/org/apache/spark/network/protocol/MergedBlockMetaRequest.java
##########
@@ -88,6 +98,7 @@ public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
       .append("requestId", requestId)
       .append("appId", appId)
+      .append("shuffleMergeId", shuffleMergeId)

Review comment:
       nit: append after `shuffleId`? (to keep consistent with other methods 
above)

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);
     ByteBuffer[] blocks = new ByteBuffer[]{
-      ByteBuffer.wrap(new byte[4]),
-      ByteBuffer.wrap(new byte[5]),
-      ByteBuffer.wrap(new byte[6]),
-      ByteBuffer.wrap(new byte[7])
+        ByteBuffer.wrap(new byte[4]),
+        ByteBuffer.wrap(new byte[5]),
+        ByteBuffer.wrap(new byte[6]),
+        ByteBuffer.wrap(new byte[7])
     };
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+        new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));

Review comment:
       revert indent?

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);

Review comment:
       revert indent?
   
   

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -1065,7 +1232,12 @@ private AppPathsInfo(
     private final String appId;
     private final int attemptId;
     private final AppPathsInfo appPathsInfo;
-    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+    /**
+     * 1. Key tracks shuffleId for an application
+     * 2. Value tracks the AppShuffleMergePartitionsInfo having shuffleMergeId 
and
+     * a Map tracking AppShufflePartitionInfo for all the shuffle partitions.
+     */
+    private final ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> 
partitions;

Review comment:
       Shall we rename this to `shuffles`? I think the hierarchy here is `app 
-> shuffle ->partitions`.

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);
     ByteBuffer[] blocks = new ByteBuffer[]{
-      ByteBuffer.wrap(new byte[4]),
-      ByteBuffer.wrap(new byte[5]),
-      ByteBuffer.wrap(new byte[6]),
-      ByteBuffer.wrap(new byte[7])
+        ByteBuffer.wrap(new byte[4]),
+        ByteBuffer.wrap(new byte[5]),
+        ByteBuffer.wrap(new byte[6]),
+        ByteBuffer.wrap(new byte[7])
     };
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+        new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
     // The onData callback should be called 4 times here before the onComplete 
callback. But a
     // register executor message arrives in shuffle service after the 2nd 
onData callback. The 3rd
     // onData callback should all throw ClosedChannelException as their 
channels are closed.
     stream1.onData(stream1.getID(), blocks[0]);
     stream1.onData(stream1.getID(), blocks[1]);
     Path[] attempt2LocalDirs = createLocalDirs(2);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
-      MERGE_DIRECTORY_META_2);
+        prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+        MERGE_DIRECTORY_META_2);
     closed.acquire();
     // Should throw ClosedChannelException here.
     stream1.onData(stream1.getID(), blocks[3]);
   }
 
+  @Test
+  public void testBlockPushWithOlderShuffleMergeId() throws IOException {
+    StreamCallbackWithID stream1 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    StreamCallbackWithID stream2 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    try {
+      // stream 1 push should be rejected as it is from an older shuffleMergeId
+      stream1.onComplete(stream1.getID());
+    } catch(RuntimeException re) {
+      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
+        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
+          + " pushed", re.getMessage());
+    }
+    // stream 2 now completes
+    stream2.onComplete(stream2.getID());
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 2));
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
2, 0);
+    validateChunks(TEST_APP, 0, 2, 0, blockMeta, new int[]{4}, new 
int[][]{{0}});
+  }
+
+  @Test
+  public void testFinalizeWithOlderShuffleMergeId() throws IOException {
+    StreamCallbackWithID stream1 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0, 0));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    StreamCallbackWithID stream2 =
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0, 0));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
+    stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
+    try {
+      // stream 1 push should be rejected as it is from an older shuffleMergeId
+      stream1.onComplete(stream1.getID());
+    } catch(RuntimeException re) {
+      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
+        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
+          + " pushed", re.getMessage());
+    }
+    // stream 2 now completes
+    stream2.onComplete(stream2.getID());
+    try {
+      pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0, 1));
+    } catch(RuntimeException re) {

Review comment:
       This doesn't really throw the exception. Could you check it?

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);
     ByteBuffer[] blocks = new ByteBuffer[]{
-      ByteBuffer.wrap(new byte[4]),
-      ByteBuffer.wrap(new byte[5]),
-      ByteBuffer.wrap(new byte[6]),
-      ByteBuffer.wrap(new byte[7])
+        ByteBuffer.wrap(new byte[4]),
+        ByteBuffer.wrap(new byte[5]),
+        ByteBuffer.wrap(new byte[6]),
+        ByteBuffer.wrap(new byte[7])
     };
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+        new PushBlockStream(testApp, 1, 0, 0, 0, 0, 0));
     // The onData callback should be called 4 times here before the onComplete 
callback. But a
     // register executor message arrives in shuffle service after the 2nd 
onData callback. The 3rd
     // onData callback should all throw ClosedChannelException as their 
channels are closed.
     stream1.onData(stream1.getID(), blocks[0]);
     stream1.onData(stream1.getID(), blocks[1]);
     Path[] attempt2LocalDirs = createLocalDirs(2);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
-      MERGE_DIRECTORY_META_2);
+        prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+        MERGE_DIRECTORY_META_2);

Review comment:
       revert?

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -1022,55 +1019,240 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
 
   @Test(expected = ClosedChannelException.class)
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
-    throws IOException, InterruptedException {
+      throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
       void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
         super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
         closed.release();
       }
     };
     String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
     Path[] attempt1LocalDirs = createLocalDirs(1);
     registerExecutor(testApp,
-      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
-      MERGE_DIRECTORY_META_1);
+        prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+        MERGE_DIRECTORY_META_1);
     ByteBuffer[] blocks = new ByteBuffer[]{
-      ByteBuffer.wrap(new byte[4]),
-      ByteBuffer.wrap(new byte[5]),
-      ByteBuffer.wrap(new byte[6]),
-      ByteBuffer.wrap(new byte[7])
+        ByteBuffer.wrap(new byte[4]),
+        ByteBuffer.wrap(new byte[5]),
+        ByteBuffer.wrap(new byte[6]),
+        ByteBuffer.wrap(new byte[7])

Review comment:
       revert?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
##########
@@ -407,27 +408,65 @@ protected Ratio getRatio() {
       return rddAndSplitIds;
     }
 
+    /**
+     * @param blockIds Regular shuffle blockIds starts with SHUFFLE_BLOCK_ID 
to be parsed
+     * @param shuffleId shuffle blocks shuffleId
+     * @return mapId and reduceIds of the shuffle blocks in the same order as 
that of the blockIds
+     *
+     * Regular shuffle blocks format should be blockIdParts[1] = shuffleId,
+     * blockIdParts[2] = mapId, blockIdParts[3] = reduceId
+     */
     private int[] shuffleMapIdAndReduceIds(String[] blockIds, int shuffleId) {
-      // For regular shuffle blocks, primaryId is mapId and secondaryIds are 
reduceIds.
-      // For shuffle chunks, primaryIds is reduceId and secondaryIds are 
chunkIds.
-      final int[] primaryIdAndSecondaryIds = new int[2 * blockIds.length];
+      final int[] mapIdAndReduceIds = new int[2 * blockIds.length];
       for (int i = 0; i < blockIds.length; i++) {
         String[] blockIdParts = blockIds[i].split("_");
-        if (blockIdParts.length != 4
-          || (!requestForMergedBlockChunks && 
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID))
-          || (requestForMergedBlockChunks && 
!blockIdParts[0].equals(SHUFFLE_CHUNK_ID))) {
+        if (blockIdParts.length != 4 || 
!blockIdParts[0].equals(SHUFFLE_BLOCK_ID)) {
           throw new IllegalArgumentException("Unexpected shuffle block id 
format: " + blockIds[i]);
         }
         if (Integer.parseInt(blockIdParts[1]) != shuffleId) {
           throw new IllegalArgumentException("Expected shuffleId=" + shuffleId 
+
             ", got:" + blockIds[i]);
         }
-        // For regular blocks, blockIdParts[2] is mapId. For chunks, it is 
reduceId.
-        primaryIdAndSecondaryIds[2 * i] = Integer.parseInt(blockIdParts[2]);
-        // For regular blocks, blockIdParts[3] is reduceId. For chunks, it is 
chunkId.
-        primaryIdAndSecondaryIds[2 * i + 1] = 
Integer.parseInt(blockIdParts[3]);
+        // mapId
+        mapIdAndReduceIds[2 * i] = Integer.parseInt(blockIdParts[2]);
+        // reduceId
+        mapIdAndReduceIds[2 * i + 1] = Integer.parseInt(blockIdParts[3]);
       }
-      return primaryIdAndSecondaryIds;
+      return mapIdAndReduceIds;
+    }
+
+    /**
+     * @param blockIds Shuffle merged chunks starts with SHUFFLE_CHUNK_ID to 
be parsed
+     * @param shuffleId shuffle blocks shuffleId
+     * @param shuffleMergeId shuffleMergeId is used to uniquely identify 
merging process
+     *                       of shuffle by an indeterminate stage attempt.
+     * @return reduceId and chunkIds of the shuffle chunks in the same order 
as that of the
+     *         blockIds
+     *
+     * Shuffle merged chunks format should be blockIdParts[1] = shuffleId,
+     * blockIdParts[2] = shuffleMergeId, blockIdParts[3] = reduceId, 
blockIdParts[4] = chunkId

Review comment:
       Actually, what I expect is something like 
"shuffleChunk_shuffleId_shuffleMergeId_reduceId_chunkId"..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to