zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r640999074



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -117,64 +123,85 @@ public ShuffleIndexInformation load(File file) throws 
IOException {
    * shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppAttemptShuffleId appAttemptShuffleId,
+      AppAttemptPathsInfo appAttemptPathsInfo,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
+    File dataFile = getMergedShuffleDataFile(
+      appAttemptPathsInfo, appAttemptShuffleId.appId, 
appAttemptShuffleId.shuffleId, reduceId);
+    if (!partitions.containsKey(appAttemptShuffleId) && dataFile.exists()) {
       // If this partition is already finalized then the partitions map will 
not contain
       // the appShuffleId but the data file would exist. In that case the 
block is considered late.
       return null;
     }
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
-    return shufflePartitions.computeIfAbsent(reduceId, key -> {
-      // It only gets here when the key is not present in the map. This could 
either
-      // be the first time the merge manager receives a pushed block for a 
given application
-      // shuffle partition, or after the merged shuffle file is finalized. We 
handle these
-      // two cases accordingly by checking if the file already exists.
-      File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
-      File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
-      try {
-        if (dataFile.exists()) {
-          return null;
-        } else {
-          return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, 
indexFile, metaFile);
-        }
-      } catch (IOException e) {
-        logger.error(
-          "Cannot create merged shuffle partition with data file {}, index 
file {}, and "
-            + "meta file {}", dataFile.getAbsolutePath(),
-            indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
-        throw new RuntimeException(
-          String.format("Cannot initialize merged shuffle partition for appId 
%s shuffleId %s "
-          + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, 
reduceId), e);
+    // While processing the application remove, where the shuffle partitions 
info for the specific
+    // application will be cleaned up, this method will still be called to 
create new partitions
+    // as of receiving the push blocks. To avoid the potential memory leak, 
before creating the
+    // empty hashmap for storing the shuffle partitions information in the 
partitions hashmap,
+    // we need to make sure that the entry for the specific application must 
still exist in
+    // appAttemptsPathInfo hashmap. Otherwise, the push blocks should be 
ignored.

Review comment:
       Discussed offline. Even though when the context switch happens after 
line 150, during applicationRemove, the entry for this key/value pair will be 
removed from the partitions hashmap. The shufflePartitions AtomicReference 
won't be null and files will be created for merge shuffle. But the newly 
created hashmap will be a temporary referenced object in this method only and 
it will be cleaned by JVM itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to