Ngone51 commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r663719472



##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -219,101 +238,145 @@ public void 
testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
     stream1.onComplete(stream1.getID());
     // stream 2 now completes completes
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
   }
 
   @Test
   public void testFailureAfterData() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
 
   @Test
   public void testFailureAfterMultipleDataBlocks() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
 
   @Test
   public void testFailureAfterComplete() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onComplete(stream.getID());
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
   }
 
   @Test (expected = RuntimeException.class)
-  public void testTooLateArrival() throws IOException {
+  public void testBlockReceivedAfterMergeFinalize() throws IOException {
     ByteBuffer[] blocks = new ByteBuffer[]{
       ByteBuffer.wrap(new byte[4]),
       ByteBuffer.wrap(new byte[5])
     };
     StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     for (ByteBuffer block : blocks) {
       stream.onData(stream.getID(), block);
     }
     stream.onComplete(stream.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
     try {
       stream1.onComplete(stream1.getID());
     } catch (RuntimeException re) {
       assertEquals(
-        "Block shufflePush_0_1_0 received after merged shuffle is finalized",
-          re.getMessage());
+        "Block shufflePush_0_1_0 received after merged shuffle is finalized", 
re.getMessage());
       MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
       validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
       throw re;
     }
   }
 
+  @Test (expected = RuntimeException.class)
+  public void testBlockReceivedAfterNewAttemptRegistered() throws IOException {
+    String testApp = "updateLocalDirsTwiceWithTwoAttempts";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream2.onData(stream2.getID(), block);
+    }
+    stream2.onComplete(stream2.getID());
+    StreamCallbackWithID stream3 = pushResolver.receiveBlockDataAsStream(

Review comment:
       I have run this test. It turns out that the  `RuntimeException` is 
actually thrown here due to:
   
   "The attempt id 1 in this PushBlockStream message does not match with the 
current attempt id 2 stored in shuffle service for application 
updateLocalDirsTwiceWithTwoAttempts"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to