zhouyejoe commented on a change in pull request #33034:
URL: https://github.com/apache/spark/pull/33034#discussion_r679691086



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +138,88 @@ protected AppShuffleInfo 
validateAndGetAppShuffleInfo(String appId) {
   }
 
   /**
-   * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies 
a given shuffle
-   * partition of an application, retrieves the associated metadata. If not 
present and the
-   * corresponding merged shuffle does not exist, initializes the metadata.
+   * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that 
uniquely identifies
+   * a given shuffle partition of an application, retrieves the associated 
metadata. If not
+   * present and the corresponding merged shuffle does not exist, initializes 
the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
-      int reduceId) {
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
-    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
-      appShuffleInfo.partitions;
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.compute(shuffleId, (id, map) -> {
-        if (map == null) {
+      int shuffleMergeId,
+      int reduceId) throws StaleBlockPushException {
+    ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> partitions = 
appShuffleInfo.partitions;
+    AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
+      partitions.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
+        if (appShuffleMergePartitionsInfo == null) {
+          File dataFile =
+            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
           // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId but the data file would exist. In that case the block 
is considered late.
+          // shuffleId for determinate stages but the data file would exist.
+          // In that case the block is considered late. In the case of 
indeterminate stages, most
+          // recent shuffleMergeId finalized would be pointing to 
INDETERMINATE_SHUFFLE_FINALIZED
           if (dataFile.exists()) {
             return null;
+          } else {
+            logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
+              + " with shuffleMergeId {} for application {}_{}", shuffleId, 
shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId);
+            return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
           }
-          return new ConcurrentHashMap<>();
         } else {
-          return map;
+          // Reject the request as we have already seen a higher 
shuffleMergeId than the
+          // current incoming one
+          int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
+          if (latestShuffleMergeId > shuffleMergeId) {
+            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
+              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
+                + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));

Review comment:
       Nit: These two lines should have the same indents as the first +. 
Example: 
https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L70

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -128,58 +138,88 @@ protected AppShuffleInfo 
validateAndGetAppShuffleInfo(String appId) {
   }
 
   /**
-   * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies 
a given shuffle
-   * partition of an application, retrieves the associated metadata. If not 
present and the
-   * corresponding merged shuffle does not exist, initializes the metadata.
+   * Given the appShuffleInfo, shuffleId, shuffleMergeId and reduceId that 
uniquely identifies
+   * a given shuffle partition of an application, retrieves the associated 
metadata. If not
+   * present and the corresponding merged shuffle does not exist, initializes 
the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
-      int reduceId) {
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
-    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
-      appShuffleInfo.partitions;
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.compute(shuffleId, (id, map) -> {
-        if (map == null) {
+      int shuffleMergeId,
+      int reduceId) throws StaleBlockPushException {
+    ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> partitions = 
appShuffleInfo.partitions;
+    AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
+      partitions.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
+        if (appShuffleMergePartitionsInfo == null) {
+          File dataFile =
+            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
           // If this partition is already finalized then the partitions map 
will not contain the
-          // shuffleId but the data file would exist. In that case the block 
is considered late.
+          // shuffleId for determinate stages but the data file would exist.
+          // In that case the block is considered late. In the case of 
indeterminate stages, most
+          // recent shuffleMergeId finalized would be pointing to 
INDETERMINATE_SHUFFLE_FINALIZED
           if (dataFile.exists()) {
             return null;
+          } else {
+            logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
+              + " with shuffleMergeId {} for application {}_{}", shuffleId, 
shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId);
+            return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
           }
-          return new ConcurrentHashMap<>();
         } else {
-          return map;
+          // Reject the request as we have already seen a higher 
shuffleMergeId than the
+          // current incoming one
+          int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
+          if (latestShuffleMergeId > shuffleMergeId) {
+            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
+              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
+                + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
+                appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));
+          } else if (latestShuffleMergeId == shuffleMergeId) {
+            return appShuffleMergePartitionsInfo;
+          } else {
+            // Higher shuffleMergeId seen for the shuffle ID meaning new stage 
attempt is being
+            // run for the shuffle ID. Close and clean up old shuffleMergeId 
files,
+            // happens in the indeterminate stage retries
+            logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
+              + " with shuffleMergeId {} for application {}_{} since it is 
higher than the"
+                + " latest shuffleMergeId {} already seen", shuffleId, 
shuffleMergeId,

Review comment:
       Nit: Indent. Same above

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -270,18 +333,28 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
   void closeAndDeletePartitionFilesIfNeeded(
       AppShuffleInfo appShuffleInfo,
       boolean cleanupLocalDirs) {
-    for (Map<Integer, AppShufflePartitionInfo> partitionMap : 
appShuffleInfo.partitions.values()) {
-      for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
-        synchronized (partitionInfo) {
-          partitionInfo.closeAllFiles();
-        }
-      }
-    }
+    appShuffleInfo.partitions.values().stream()
+      .flatMap(x -> x.shuffleMergePartitions.values().stream())
+      .forEach(partitionInfo -> 
partitionInfo.closeAllFilesAndDeleteIfNeeded(false));

Review comment:
       The synchronized is removed. Will that cause any issues?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -563,9 +675,10 @@ public String getID() {
     private void writeBuf(ByteBuffer buf) throws IOException {
       while (buf.hasRemaining()) {
         long updatedPos = partitionInfo.getDataFilePos() + length;
-        logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos 
{}",
-          partitionInfo.appId, partitionInfo.shuffleId,
-          partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
+        logger.debug("{} shuffleId {} shuffleMergeId {} reduceId {} current 
pos"
+          + " {} updated pos {}", partitionInfo.appId, partitionInfo.shuffleId,

Review comment:
       Nit: Indent should be the same with the above "+". There are some other 
places down below for the long line "logger" that there is no need to put an 
additional indent, as the string and the later variables are at the same of 
parameters for the debug method. Please double check other ones.

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -270,18 +333,28 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
   void closeAndDeletePartitionFilesIfNeeded(
       AppShuffleInfo appShuffleInfo,
       boolean cleanupLocalDirs) {
-    for (Map<Integer, AppShufflePartitionInfo> partitionMap : 
appShuffleInfo.partitions.values()) {
-      for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
-        synchronized (partitionInfo) {
-          partitionInfo.closeAllFiles();
-        }
-      }
-    }
+    appShuffleInfo.partitions.values().stream()
+      .flatMap(x -> x.shuffleMergePartitions.values().stream())
+      .forEach(partitionInfo -> 
partitionInfo.closeAllFilesAndDeleteIfNeeded(false));

Review comment:
       Should we make the newly added closeAndDeletePartitionFiles method to 
have a marker parameter for the deletion, then we can call it here?
   for (Map<Integer, AppShufflePartitionInfo> partitionMap : 
appShuffleInfo.partitions.values()) {
     closeAndDeletePartitionFiles(partitionMap, false)
   }

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -188,49 +228,72 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
   AppShufflePartitionInfo newAppShufflePartitionInfo(
       String appId,
       int shuffleId,
+      int shuffleMergeId,
       int reduceId,
       File dataFile,
       File indexFile,
       File metaFile) throws IOException {
-    return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile,
+    return new AppShufflePartitionInfo(appId, shuffleId, shuffleMergeId, 
reduceId, dataFile,
       new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
   }
 
   @Override
-  public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
+  public MergedBlockMeta getMergedBlockMeta(
+      String appId,
+      int shuffleId,
+      int shuffleMergeId,
+      int reduceId) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    Map<Integer, AppShuffleMergePartitionsInfo> partitions = 
appShuffleInfo.partitions;
+    if (partitions.containsKey(shuffleId) &&
+      partitions.get(shuffleId).shuffleMergeId > shuffleMergeId) {
+      throw new RuntimeException(String.format(
+        "MergedBlockMeta fetch for shuffle %s with shuffleMergeId %s reduceId 
%s is %s",
+          shuffleId, shuffleMergeId, reduceId,
+          ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));
+    }
     File indexFile =
-      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
+      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, shuffleMergeId, 
reduceId);
     if (!indexFile.exists()) {
       throw new RuntimeException(String.format(
         "Merged shuffle index file %s not found", indexFile.getPath()));
     }
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
-    File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
reduceId);
+    File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
shuffleMergeId, reduceId);
     if (!metaFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle meta file %s 
not found",
         metaFile.getPath()));
     }
     FileSegmentManagedBuffer chunkBitMaps =
       new FileSegmentManagedBuffer(conf, metaFile, 0L, metaFile.length());
     logger.trace(
-      "{} shuffleId {} reduceId {} num chunks {}", appId, shuffleId, reduceId, 
numChunks);
+      "{} shuffleId {} shuffleMergeId {} reduceId {} num chunks {}",
+        appId, shuffleId, shuffleMergeId, reduceId, numChunks);
     return new MergedBlockMeta(numChunks, chunkBitMaps);
   }
 
   @SuppressWarnings("UnstableApiUsage")
   @Override
-  public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int 
reduceId, int chunkId) {
+  public ManagedBuffer getMergedBlockData(
+      String appId, int shuffleId, int shuffleMergeId, int reduceId, int 
chunkId) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
-    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
+    Map<Integer, AppShuffleMergePartitionsInfo> partitions = 
appShuffleInfo.partitions;
+    if (partitions.containsKey(shuffleId) &&
+      partitions.get(shuffleId).shuffleMergeId > shuffleMergeId) {
+      throw new RuntimeException(String.format(
+          "MergedBlockData fetch for shuffle %s with shuffleMergeId %s 
reduceId %s is %s",
+          shuffleId, shuffleMergeId, reduceId,
+          ErrorHandler.BlockFetchErrorHandler.STALE_SHUFFLE_BLOCK_FETCH));

Review comment:
       Nit: Indent is off

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -304,9 +377,9 @@ void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.mapIndex,
-      msg.reduceId);
+    final String streamId = String.format("%s_%d_%d_%d_%d",
+      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.shuffleMergeId,
+        msg.mapIndex, msg.reduceId);

Review comment:
       Nit: Indent should be the same with the above line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to