otterc commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r671507612



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
##########
@@ -64,25 +68,28 @@ public boolean equals(Object other) {
     if (other != null && other instanceof FinalizeShuffleMerge) {
       FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
       return Objects.equal(appId, o.appId)
+        && appAttemptId == appAttemptId
         && shuffleId == o.shuffleId;
     }
     return false;
   }
 
   @Override
   public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId) + 4;
+    return Encoders.Strings.encodedLength(appId) + 8;

Review comment:
       super nit: 4 + 4

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -210,76 +244,55 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
     }
   }
 
-  /**
-   * The logic here is consistent with
-   * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
-   *      org.apache.spark.storage.BlockId, scala.Option)]]
-   */
-  private File getFile(String appId, String filename) {
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
-    logger.debug("Get merged file {}", targetFile.getAbsolutePath());
-    return targetFile;
-  }
-
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
-  }
-
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
-  }
-
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
-  }
-
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    String[] activeLocalDirs =
+      Preconditions.checkNotNull(appShuffleInfo.appPathsInfo.activeLocalDirs,

Review comment:
       This throws NPE. Instead we can use `Preconditions.checkArgument` which 
throws IllegalArgumentException

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
##########
@@ -19,9 +19,9 @@
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
+
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
-

Review comment:
       This extra line is needed right?

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -821,20 +915,132 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReceivedAfterNewAttemptRegistered()
+      throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "updateLocalDirsTwiceWithTwoAttempts";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+      pushResolver.validateAndGetAppShuffleInfo(testApp);
+    Map<Integer, Map<Integer, 
RemoteBlockPushResolver.AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.getPartitions();
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertTrue(partitionInfo.getDataChannel().isOpen());
+        assertTrue(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertTrue(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream2.onData(stream2.getID(), block);
+    }
+    stream2.onComplete(stream2.getID());
+    closed.acquire();
+    // Check if all the file channels created for the first attempt are safely 
closed.
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertFalse(partitionInfo.getDataChannel().isOpen());
+        assertFalse(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertFalse(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    try {
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, 1, 0, 1, 0, 0));
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "The attempt id 1 in this PushBlockStream message does not match " +
+          "with the current attempt id 2 stored in shuffle service for 
application " +
+          "updateLocalDirsTwiceWithTwoAttempts", re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = ClosedChannelException.class)
+  public void testPushBlockStreamCallBackWhileNewAttemptRegistered()

Review comment:
       Nit: change the test name to 
`testOngoingMergeOfBlockFromPreviousAttemptIsAborted`

##########
File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
##########
@@ -821,20 +915,132 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testBlockReceivedAfterNewAttemptRegistered()

Review comment:
       Nit: change the test name to something like 
`testBlockFromPreviousAttemptIsRejected` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to