Ngone51 commented on a change in pull request #33078:
URL: https://github.com/apache/spark/pull/33078#discussion_r672122799



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -112,34 +118,59 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
+  @VisibleForTesting
+  protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) {
+    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId);
+    Preconditions.checkArgument(appShuffleInfo != null,
+      "application " + appId + " is not registered or NM was restarted.");
+    return appShuffleInfo;
+  }
+
   /**
-   * Given the appShuffleId and reduceId that uniquely identifies a given 
shuffle partition of an
-   * application, retrieves the associated metadata. If not present and the 
corresponding merged
-   * shuffle does not exist, initializes the metadata.
+   * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies 
a given shuffle
+   * partition of an application, retrieves the associated metadata. If not 
present and the
+   * corresponding merged shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppShuffleInfo appShuffleInfo,
+      int shuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
-      // If this partition is already finalized then the partitions map will 
not contain
-      // the appShuffleId but the data file would exist. In that case the 
block is considered late.
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
+    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.partitions;
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      partitions.compute(shuffleId, (id, map) -> {
+        if (map == null) {
+          // If this partition is already finalized then the partitions map 
will not contain the
+          // shuffleId but the data file would exist. In that case the block 
is considered late.
+          if (dataFile.exists()) {
+            return null;
+          }
+          return new ConcurrentHashMap<>();

Review comment:
       oh..sorry, I misread it. Looks good!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to